feat(milvus): add instrument for AsyncMilvusClient#3950
feat(milvus): add instrument for AsyncMilvusClient#3950jayvynl wants to merge 2 commits intotraceloop:mainfrom
Conversation
|
Alex Lin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
📝 WalkthroughWalkthroughAdds async support for Milvus instrumentation: introduces async-aware wrapper routing, implements an async tracing path mirroring the sync lifecycle, adds a runtime check for async client availability, expands pytest asyncio config, and adds comprehensive async integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as AsyncMilvusClient
participant Wrap as _wrap (router)
participant AsyncTracer as _async_with_tracer
participant Tracer as OpenTelemetry Tracer
participant Metrics as Metrics Recorder
participant MilvusDB as Milvus Server
Client->>Wrap: call wrapped async method
Wrap->>Wrap: inspect.iscoroutinefunction()?
Wrap->>AsyncTracer: route to async path
AsyncTracer->>AsyncTracer: check _SUPPRESS_INSTRUMENTATION_KEY
AsyncTracer->>Tracer: start_as_current_span()
AsyncTracer->>AsyncTracer: set DB & method attributes
AsyncTracer->>MilvusDB: await execution
MilvusDB-->>AsyncTracer: result or exception
alt exception
AsyncTracer->>AsyncTracer: set error.type, record exception
AsyncTracer-->>Client: re-raise
else success
AsyncTracer->>AsyncTracer: record per-result events
AsyncTracer->>Metrics: record duration & response metrics
AsyncTracer-->>Client: return result
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/opentelemetry-instrumentation-milvus/tests/test_hybrid_search.py (1)
11-43:⚠️ Potential issue | 🟠 MajorAdd teardown to the hybrid-search fixture.
This fixture writes
my_hybrid_searchinto a persistentmilvus.dband returns immediately, so the collection survives the test. That leaks state into reruns and into the async hybrid-search suite, which reuses the same collection name and DB path.🧹 Proposed fix
- client.create_collection(collection_name=collection_name, schema=schema, index_params=index_params) - return collection_name + client.create_collection(collection_name=collection_name, schema=schema, index_params=index_params) + yield collection_name + client.drop_collection(collection_name=collection_name)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/opentelemetry-instrumentation-milvus/tests/test_hybrid_search.py` around lines 11 - 43, The fixture named collection creates a persistent Milvus collection "my_hybrid_search" but never cleans it up; change the pytest fixture to a yield-style fixture that yields collection_name and after the yield calls client.drop_collection(collection_name) (and any required index cleanup via index_params or client APIs) to remove the collection from the persistent milvus.db so tests don't leak state into reruns or the async hybrid-search suite.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/__init__.py`:
- Around line 74-120: Update the declared minimum pymilvus version to >=2.6.0 so
the AsyncMilvusClient methods (create_collection, insert, upsert, delete,
search, get, query, hybrid_search) are actually supported: modify the version
spec in the _instruments mapping that lists package "pymilvus" and also bump the
pymilvus dependency in pyproject.toml (both dev and test dependency groups) from
>=2.4.1 to >=2.6.0.
In
`@packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py`:
- Around line 31-61: The hybrid_collection fixture creates a persistent
collection and doesn't clean it up; after yielding collection_name, call the
teardown to remove the collection (use client.drop_collection(collection_name)
or the appropriate async delete method) to ensure the on-disk DB is cleaned up;
modify the async fixture hybrid_collection so that immediately after the yield
it awaits the client.drop_collection/delete method and handle any NotFound/no-op
errors gracefully.
---
Outside diff comments:
In `@packages/opentelemetry-instrumentation-milvus/tests/test_hybrid_search.py`:
- Around line 11-43: The fixture named collection creates a persistent Milvus
collection "my_hybrid_search" but never cleans it up; change the pytest fixture
to a yield-style fixture that yields collection_name and after the yield calls
client.drop_collection(collection_name) (and any required index cleanup via
index_params or client APIs) to remove the collection from the persistent
milvus.db so tests don't leak state into reruns or the async hybrid-search
suite.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0e399242-c398-4997-ab76-a7fc30c26655
⛔ Files ignored due to path filters (1)
packages/opentelemetry-instrumentation-milvus/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/__init__.pypackages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/wrapper.pypackages/opentelemetry-instrumentation-milvus/pyproject.tomlpackages/opentelemetry-instrumentation-milvus/tests/conftest.pypackages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.pypackages/opentelemetry-instrumentation-milvus/tests/test_hybrid_search.pypackages/opentelemetry-instrumentation-milvus/tests/test_query.pypackages/opentelemetry-instrumentation-milvus/tests/test_search.py
…collection fixture
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py`:
- Around line 150-153: The test currently iterates over metric families returned
by find_metrics_by_name(...) but doesn't assert that any families were found,
making the check vacuous; update the tests (around uses of
reader.get_metrics_data(), find_metrics_by_name(), and the variables like
distance_metrics) to first assert that the returned list is non-empty (e.g.,
assert distance_metrics), then proceed to loop and validate data points (repeat
the same non-empty assertion for the other metric checks at the locations
corresponding to the other loops mentioned). Ensure you reference the same
symbols (reader.get_metrics_data, find_metrics_by_name, Meters.* constants) when
adding the non-empty assertions so the tests fail if a metric family is missing.
- Around line 181-186: The test currently iterates all span.events and will pass
even if no DB_QUERY_RESULT events exist; modify the check to first filter
span.events for events where event.name == Events.DB_QUERY_RESULT.value, assert
that the resulting list has length 3 (since three brown rows/limit=3 are
expected), and then iterate that filtered list to validate each event's
attributes (color_tag is str and id is int). Locate the logic operating on
span.events and Events.DB_QUERY_RESULT.value in test_async_milvus_client.py and
apply this change to ensure the expected query-result events are required.
- Around line 127-128: The test looks up a "milvus.search" span from
exporter.get_finished_spans() which uses a session-scoped exporter, so the
lookup can return older spans and make assertions order-dependent; update the
tests in test_async_milvus_client.py (e.g., the lookups around
exporter.get_finished_spans() and the next(s for s in spans if s.name ==
"milvus.search") calls used in test_async_milvus_single_vector_search and the
other mentioned cases) to either clear the exporter state before each test or,
when selecting the span, iterate reversed(spans) and apply explicit filters for
span.name and operation/collection to pick the most recent matching span so
assertions are deterministic. Ensure every occurrence of
exporter.get_finished_spans() in this file uses one of these approaches.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f3dbefbc-608d-48c8-a7c2-ddcd854998c6
📒 Files selected for processing (4)
packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/__init__.pypackages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/utils.pypackages/opentelemetry-instrumentation-milvus/tests/conftest.pypackages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py
✅ Files skipped from review due to trivial changes (1)
- packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/utils.py
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/opentelemetry-instrumentation-milvus/tests/conftest.py
- packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/init.py
| spans = exporter.get_finished_spans() | ||
| span = next(s for s in spans if s.name == "milvus.search") |
There was a problem hiding this comment.
Select spans from an isolated exporter state.
exporter is session-scoped in packages/opentelemetry-instrumentation-milvus/tests/conftest.py, so these lookups scan the full accumulated span buffer. On Line 255, next(s for s in spans if s.name == "milvus.search") can pick the earlier success span once test_async_milvus_single_vector_search has already run, which makes the error-path assertion order-dependent. Clear the exporter before each test, or select from reversed(spans) with collection/operation filters.
Possible fix
+@pytest.fixture(autouse=True)
+def clear_exporter(exporter):
+ exporter.clear()
+ yield
+ exporter.clear()
+
...
- span = next(s for s in spans if s.name == "milvus.search")
+ span = next(s for s in reversed(spans) if s.name == "milvus.search")Also applies to: 166-167, 192-193, 254-255, 301-302
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py`
around lines 127 - 128, The test looks up a "milvus.search" span from
exporter.get_finished_spans() which uses a session-scoped exporter, so the
lookup can return older spans and make assertions order-dependent; update the
tests in test_async_milvus_client.py (e.g., the lookups around
exporter.get_finished_spans() and the next(s for s in spans if s.name ==
"milvus.search") calls used in test_async_milvus_single_vector_search and the
other mentioned cases) to either clear the exporter state before each test or,
when selecting the span, iterate reversed(spans) and apply explicit filters for
span.name and operation/collection to pick the most recent matching span so
assertions are deterministic. Ensure every occurrence of
exporter.get_finished_spans() in this file uses one of these approaches.
There was a problem hiding this comment.
There exists clear_exporter fixture in @packages/opentelemetry-instrumentation-milvus/tests/conftest.py
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for clear_exporter fixture in conftest.py
rg -n "clear_exporter" packages/opentelemetry-instrumentation-milvus/tests/conftest.py
# Check if clear_exporter is used in test_async_milvus_client.py
rg -n "clear_exporter" packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py
# Show the clear_exporter fixture definition
rg -n -A5 "def clear_exporter" packages/opentelemetry-instrumentation-milvus/tests/conftest.py
Length of output: 306
🏁 Script executed:
#!/bin/bash
# Show full clear_exporter fixture with its decorator
rg -n -B3 -A5 "def clear_exporter" packages/opentelemetry-instrumentation-milvus/tests/conftest.py
# Check if test_async_milvus_client.py uses clear_exporter or clear_exporter_reader as a parameter
rg -n "def test_" packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py
Length of output: 732
@jayvynl, you're absolutely right — my apologies for the noise. The clear_exporter fixture in conftest.py is already decorated with @pytest.fixture(autouse=True), so it clears the exporter state before every test automatically. The span lookups are already isolated and deterministic. This was a false positive on my part.
(◕‸◕✿)
🧠 Learnings used
Learnt from: CR
Repo: traceloop/openllmetry PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-01-15T15:08:40.421Z
Learning: Applies to **/*.py : Use ConsoleSpanExporter from opentelemetry.sdk.trace.export for debugging OpenTelemetry spans and hierarchy issues
| metrics_data = reader.get_metrics_data() | ||
| distance_metrics = find_metrics_by_name(metrics_data, Meters.DB_SEARCH_DISTANCE) | ||
| for metric in distance_metrics: | ||
| assert all(dp.sum >= 0 for dp in metric.data.data_points) |
There was a problem hiding this comment.
Assert that each metric family is actually present.
These loops are vacuous when find_metrics_by_name(...) returns an empty list, so the tests still pass if the async instrumentation stops recording the metric entirely. Add a non-empty assertion before each loop so this file really protects metric emission.
Possible fix
distance_metrics = find_metrics_by_name(metrics_data, Meters.DB_SEARCH_DISTANCE)
+ assert distance_metrics, f"expected {Meters.DB_SEARCH_DISTANCE} metrics"
for metric in distance_metrics:
assert all(dp.sum >= 0 for dp in metric.data.data_points)
...
duration_metrics = find_metrics_by_name(metrics_data, Meters.DB_QUERY_DURATION)
+ assert duration_metrics, f"expected {Meters.DB_QUERY_DURATION} metrics"
for metric in duration_metrics:
assert all(dp.sum >= 0 for dp in metric.data.data_points)
...
insert_metrics = find_metrics_by_name(metrics_data, Meters.DB_USAGE_INSERT_UNITS)
+ assert insert_metrics, f"expected {Meters.DB_USAGE_INSERT_UNITS} metrics"
for metric in insert_metrics:
assert all(dp.value == 1003 for dp in metric.data.data_points)
upsert_metrics = find_metrics_by_name(metrics_data, Meters.DB_USAGE_UPSERT_UNITS)
+ assert upsert_metrics, f"expected {Meters.DB_USAGE_UPSERT_UNITS} metrics"
for metric in upsert_metrics:
assert all(dp.value == 1 for dp in metric.data.data_points)
delete_metrics = find_metrics_by_name(metrics_data, Meters.DB_USAGE_DELETE_UNITS)
+ assert delete_metrics, f"expected {Meters.DB_USAGE_DELETE_UNITS} metrics"
for metric in delete_metrics:
assert all(dp.value == 3 for dp in metric.data.data_points)Also applies to: 176-179, 225-234
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py`
around lines 150 - 153, The test currently iterates over metric families
returned by find_metrics_by_name(...) but doesn't assert that any families were
found, making the check vacuous; update the tests (around uses of
reader.get_metrics_data(), find_metrics_by_name(), and the variables like
distance_metrics) to first assert that the returned list is non-empty (e.g.,
assert distance_metrics), then proceed to loop and validate data points (repeat
the same non-empty assertion for the other metric checks at the locations
corresponding to the other loops mentioned). Ensure you reference the same
symbols (reader.get_metrics_data, find_metrics_by_name, Meters.* constants) when
adding the non-empty assertions so the tests fail if a metric family is missing.
| for event in span.events: | ||
| assert event.name == Events.DB_QUERY_RESULT.value | ||
| tag = event.attributes.get("color_tag") | ||
| _id = event.attributes.get("id") | ||
| assert isinstance(tag, str) | ||
| assert isinstance(_id, int) |
There was a problem hiding this comment.
Require the expected query-result events.
This setup always inserts exactly three brown rows (ids 1000-1002) and the query uses limit=3, but the loop on Lines 181-186 still passes when no DB_QUERY_RESULT events are emitted. Filter by Events.DB_QUERY_RESULT.value and assert len(events) == 3 before checking payload types.
Possible fix
- for event in span.events:
- assert event.name == Events.DB_QUERY_RESULT.value
+ events = [e for e in span.events if e.name == Events.DB_QUERY_RESULT.value]
+ assert len(events) == 3
+ for event in events:
tag = event.attributes.get("color_tag")
_id = event.attributes.get("id")
assert isinstance(tag, str)
assert isinstance(_id, int)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@packages/opentelemetry-instrumentation-milvus/tests/test_async_milvus_client.py`
around lines 181 - 186, The test currently iterates all span.events and will
pass even if no DB_QUERY_RESULT events exist; modify the check to first filter
span.events for events where event.name == Events.DB_QUERY_RESULT.value, assert
that the resulting list has length 3 (since three brown rows/limit=3 are
expected), and then iterate that filtered list to validate each event's
attributes (color_tag is str and id is int). Locate the logic operating on
span.events and Events.DB_QUERY_RESULT.value in test_async_milvus_client.py and
apply this change to ensure the expected query-result events are required.
feat(instrumentation): ...orfix(instrumentation): ....Summary by CodeRabbit
New Features
Tests
Chores