diff --git a/examples/notebooks/publish/publish_batch.ipynb b/examples/notebooks/publish/publish_batch.ipynb new file mode 100644 index 0000000..9d2df4f --- /dev/null +++ b/examples/notebooks/publish/publish_batch.ipynb @@ -0,0 +1,452 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e524afa7", + "metadata": {}, + "source": [ + "# Batch Publishing\n", + "\n", + "This notebook demonstrates how to use the `DataStoreClient` to batch publish N iterations of data within a given `Step`. Rather than calling `publish_condition` or `publish_measurement` N times to publish data for each of these N iterations, this data can instead be published by a single call to `publish_condition_batch` or `publish_measurement_batch`, respectively. Batch publishing can help improve overall publishing performance.\n", + "\n", + "**Note:** These batching APIs handle batch publishing N iterations of data for a single condition or measurement with the specified name. They do *not* support publishing data across multiple (distinctly named) conditions or multiple (distinctly named) measurements at once." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6876aeb1", + "metadata": {}, + "outputs": [], + "source": [ + "# Perform example-specific setup with the DataStoreContext. This is not needed when writing production code.\n", + "from utilities import DataStoreContext\n", + "data_store_context = DataStoreContext()\n", + "data_store_context.initialize()\n", + "\n", + "# Create the TestResult and Step into which we will later batch publish conditions and measurements.\n", + "from ni.datastore.data import DataStoreClient, Step, TestResult\n", + "\n", + "data_store_client = DataStoreClient()\n", + "\n", + "test_result = TestResult(name=\"Batch Publish Example\")\n", + "test_result_id = data_store_client.create_test_result(test_result)\n", + "\n", + "step = Step(name=\"Example Step\", test_result_id=test_result_id)\n", + "step_id = data_store_client.create_step(step)\n", + "\n", + "print(f\"Created Test Result: {test_result_id}\")\n", + "print(f\"Created Step: {step_id}\")" + ] + }, + { + "cell_type": "markdown", + "id": "94b7bc49", + "metadata": {}, + "source": [ + "## Batch Publishing Condition Values\n", + "\n", + "The `publish_condition_batch` method of `DataStoreClient` can be used to publish the value of a given condition across N parametric iterations at once. This usage of `publish_condition_batch` is equivalent to calling `publish_condition` for that same condition N times.\n", + "\n", + "The condition values themselves may be supplied to `publish_condition_batch` as either an `Iterable` or a `Vector`.\n", + "\n", + "Supported element types of the `Iterable` are `float`, `int`, `str`, and `bool`: " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c7c5e72", + "metadata": {}, + "outputs": [], + "source": [ + "float_condition_id = data_store_client.publish_condition_batch(\n", + " name=\"Example Float Condition\",\n", + " condition_type=\"Setup\",\n", + " values=[1.25, 2.5, 3.75, 5.0],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "integer_condition_id = data_store_client.publish_condition_batch(\n", + " name=\"Example Integer Condition\",\n", + " condition_type=\"Setup\",\n", + " values=[1, 2, 3, 4],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "string_condition_id = data_store_client.publish_condition_batch(\n", + " name=\"Example String Condition\",\n", + " condition_type=\"Setup\",\n", + " values=[\"cold\", \"ambient\", \"warm\", \"hot\"],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "bool_condition_id = data_store_client.publish_condition_batch(\n", + " name=\"Example Bool Condition\",\n", + " condition_type=\"Setup\",\n", + " values=[True, False, True, False],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "print(f\"Published Example Float Condition ID: {float_condition_id}\")\n", + "print(f\"Published Example Integer Condition ID: {integer_condition_id}\")\n", + "print(f\"Published Example String Condition ID: {string_condition_id}\")\n", + "print(f\"Published Example Bool Condition ID: {bool_condition_id}\")" + ] + }, + { + "cell_type": "markdown", + "id": "1056ff7a", + "metadata": {}, + "source": [ + "Supplying a `Vector` allows the client to specify additional information, such as units:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "73e3a3ba", + "metadata": {}, + "outputs": [], + "source": [ + "from nitypes.vector import Vector\n", + "\n", + "condition_vector = Vector(values=[0.5, 1.0, 1.5, 2.0], units=\"Amps\")\n", + "\n", + "vector_condition_id = data_store_client.publish_condition_batch(\n", + " name=\"Example Published-As-Vector Condition\",\n", + " condition_type=\"Setup\",\n", + " values=condition_vector,\n", + " step_id=step_id,\n", + " )\n", + "\n", + "print(f\"Published Example Published-As-Vector Condition ID: {vector_condition_id}\")" + ] + }, + { + "cell_type": "markdown", + "id": "67347f3d", + "metadata": {}, + "source": [ + "Published condition values can be read back in the same manner as when reading back condition values that were published individually via `publish_condition`.\n", + "\n", + "More specifically, condition values published via `publish_condition_batch` are read back as a `Vector` containing all N iterations of parametric data published for a particular condition:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e72da95e", + "metadata": {}, + "outputs": [], + "source": [ + "published_float_condition = data_store_client.get_condition(float_condition_id)\n", + "published_integer_condition = data_store_client.get_condition(integer_condition_id)\n", + "published_string_condition = data_store_client.get_condition(string_condition_id)\n", + "published_bool_condition = data_store_client.get_condition(bool_condition_id)\n", + "published_as_vector_condition = data_store_client.get_condition(vector_condition_id)\n", + "\n", + "read_back_float_vector = data_store_client.read_condition_value(published_float_condition, expected_type=Vector)\n", + "read_back_integer_vector = data_store_client.read_condition_value(published_integer_condition, expected_type=Vector)\n", + "read_back_string_vector = data_store_client.read_condition_value(published_string_condition, expected_type=Vector)\n", + "read_back_bool_vector = data_store_client.read_condition_value(published_bool_condition, expected_type=Vector)\n", + "read_back_vector = data_store_client.read_condition_value(published_as_vector_condition, expected_type=Vector)\n", + "\n", + "print(f\"Read Example Float Condition: {read_back_float_vector}\")\n", + "print(f\"Read Example Integer Condition: {read_back_integer_vector}\")\n", + "print(f\"Read Example String Condition: {read_back_string_vector}\")\n", + "print(f\"Read Example Bool Condition: {read_back_bool_vector}\")\n", + "print(f\"Read Example Published-As-Vector Condition: {read_back_vector}\")" + ] + }, + { + "cell_type": "markdown", + "id": "5bf5cab8", + "metadata": {}, + "source": [ + "## Batch Publishing Measurement Values\n", + "\n", + "The `publish_measurement_batch` method of `DataStoreClient` can similarly be used to publish the value of a given measurement across N parametric iterations at once. This usage of `publish_measurement_batch` is equivalent to calling `publish_measurement` for that same measurement N times.\n", + "\n", + "This is conceptually similar to the batch publishing of condition values. One important note, however, is that whereas conditions only support a given publish iteration containing a single scalar value, measurements support both **scalar** and **non-scalar** values.\n", + "\n", + "Examples of supported non-scalar types are `AnalogWaveform` and `Vector`. The process of batch publishing both scalar and non-scalar measurement values is similar, though as is the case for publishing scalar and non-scalar measurement values using the non-batched `publish_measurement` method, the process of later reading that measurement data from Measurement Data Services is slightly different between the two cases, as shown below.\n", + "\n", + "### Scalar Measurements\n", + "\n", + "Scalar measurement values may be supplied to `publish_measurement_batch` as either an `Iterable` or a `Vector`.\n", + "\n", + "Supported element types of the `Iterable` are `float`, `int`, `str`, and `bool`: " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "63c8e0b5", + "metadata": {}, + "outputs": [], + "source": [ + "float_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example Float Measurement\",\n", + " values=[0.125, 0.25, 0.5, 1.0],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "integer_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example Integer Measurement\",\n", + " values=[10, 20, 30, 40],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "string_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example String Measurement\",\n", + " values=[\"nominal\", \"warning\", \"critical\", \"retest\"],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "bool_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example Bool Measurement\",\n", + " values=[False, False, True, True],\n", + " step_id=step_id,\n", + " )\n", + "\n", + "print(f\"Published Example Float Measurement IDs: {float_measurement_ids}\")\n", + "print(f\"Published Example Integer Measurement IDs: {integer_measurement_ids}\")\n", + "print(f\"Published Example String Measurement IDs: {string_measurement_ids}\")\n", + "print(f\"Published Example Bool Measurement IDs: {bool_measurement_ids}\")" + ] + }, + { + "cell_type": "markdown", + "id": "50daa8b9", + "metadata": {}, + "source": [ + "Note that because these are scalar measurements, only a single measurement ID is present in the returned `Sequence` from each publish call. For non-scalar measurements, the `publish_measurement_batch` method returns a `Sequence` of N IDs, as we will see further below.\n", + "\n", + "Supplying a `Vector` allows the client to specify additional information, such as units:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b0148ddd", + "metadata": {}, + "outputs": [], + "source": [ + "measurement_vector = Vector(values=[1.2, 1.4, 1.6, 1.8], units=\"Volts\")\n", + "\n", + "vector_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example Published-As-Vector Measurement\",\n", + " values=measurement_vector,\n", + " step_id=step_id,\n", + " )\n", + "\n", + "print(f\"Published Example Published-As-Vector Measurement IDs: {vector_measurement_ids}\")" + ] + }, + { + "cell_type": "markdown", + "id": "2194f526", + "metadata": {}, + "source": [ + "Published measurement values can be read back in the same manner as when reading back measurement values that were published individually via `publish_measurement`.\n", + "\n", + "More specifically, scalar measurement values published via `publish_measurement_batch` are read back as a `Vector` containing all N iterations of parametric data published for a particular measurement:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e3514a41", + "metadata": {}, + "outputs": [], + "source": [ + "published_float_measurement = data_store_client.get_measurement(float_measurement_ids[0])\n", + "published_integer_measurement = data_store_client.get_measurement(integer_measurement_ids[0])\n", + "published_string_measurement = data_store_client.get_measurement(string_measurement_ids[0])\n", + "published_bool_measurement = data_store_client.get_measurement(bool_measurement_ids[0])\n", + "published_as_vector_measurement = data_store_client.get_measurement(vector_measurement_ids[0])\n", + "\n", + "read_back_float_measurement = data_store_client.read_measurement_value(published_float_measurement, expected_type=Vector)\n", + "read_back_integer_measurement = data_store_client.read_measurement_value(published_integer_measurement, expected_type=Vector)\n", + "read_back_string_measurement = data_store_client.read_measurement_value(published_string_measurement, expected_type=Vector)\n", + "read_back_bool_measurement = data_store_client.read_measurement_value(published_bool_measurement, expected_type=Vector)\n", + "read_back_vector_measurement = data_store_client.read_measurement_value(published_as_vector_measurement, expected_type=Vector)\n", + "\n", + "print(f\"Read Float Measurement: {read_back_float_measurement}\")\n", + "print(f\"Read Integer Measurement: {read_back_integer_measurement}\")\n", + "print(f\"Read String Measurement: {read_back_string_measurement}\")\n", + "print(f\"Read Bool Measurement: {read_back_bool_measurement}\")\n", + "print(f\"Read Published-As-Vector Measurement: {read_back_vector_measurement}\")" + ] + }, + { + "cell_type": "markdown", + "id": "90d45ba7", + "metadata": {}, + "source": [ + "### Non-Scalar Measurements\n", + "\n", + "Unlike condition values, a measurement value for a particular publish iteration may be non-scalar, such as an `AnalogWaveform` or `Vector`.\n", + "\n", + "To batch publish non-scalar values for a particular measurement, supply `publish_measurement_batch` with an `Iterable` containing N non-scalar elements. Each of the N elements in the `Iterable` will correspond to a single publish iteration. This is equivalent to calling the non-batched `publish_measurement` method N times for the measurement in question:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3a9fa541", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import timezone\n", + "import hightime as ht\n", + "import numpy as np\n", + "from nitypes.waveform import AnalogWaveform, Timing\n", + "\n", + "# Batch publish two AnalogWaveform (i.e., non-scalar) values for the Example AnalogWaveform Measurement.\n", + "waveforms = [\n", + " AnalogWaveform(\n", + " sample_count=4,\n", + " raw_data=np.array([0.0, 0.25, 0.5, 0.75], dtype=np.float64),\n", + " timing=Timing.create_with_regular_interval(\n", + " ht.timedelta(seconds=1e-3),\n", + " ht.datetime.now(timezone.utc),\n", + " ),\n", + " ),\n", + " AnalogWaveform(\n", + " sample_count=4,\n", + " raw_data=np.array([1.0, 0.85, 0.65, 0.4], dtype=np.float64),\n", + " timing=Timing.create_with_regular_interval(\n", + " ht.timedelta(seconds=1e-3),\n", + " ht.datetime.now(timezone.utc),\n", + " ),\n", + " ),\n", + "]\n", + "\n", + "waveform_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example AnalogWaveform Measurement\",\n", + " values=waveforms,\n", + " step_id=step_id,\n", + " )\n", + "\n", + "# Batch publish two Vector (i.e., non-scalar) values for the Example Vector Measurement.\n", + "vector_measurements = [\n", + " Vector(values=[1.0, 1.25, 1.5], units=\"Volts\"),\n", + " Vector(values=[2.0, 2.25, 2.5], units=\"Volts\"),\n", + "]\n", + "\n", + "vector_measurement_ids = data_store_client.publish_measurement_batch(\n", + " name=\"Example Vector Measurement\",\n", + " values=vector_measurements,\n", + " step_id=step_id,\n", + " )\n", + "\n", + "print(f\"Published Example AnalogWaveform Measurement IDs: {waveform_measurement_ids}\")\n", + "print(f\"Published Example Vector Measurement IDs: {vector_measurement_ids}\")" + ] + }, + { + "cell_type": "markdown", + "id": "e9ce1e23", + "metadata": {}, + "source": [ + "Published non-scalar measurement values can be read back in the same manner as when reading back measurement values that were published individually via `publish_measurement`.\n", + "\n", + "More specifically, non-scalar measurement values published via `publish_measurement_batch` are read back **individually**, with each published value corresponding to a separate `PublishedMeasurement`.\n", + "\n", + "The `parametric_index` of each `PublishedMeasurement` indicates the publish iteration to which it corresponds:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4596e9a9", + "metadata": {}, + "outputs": [], + "source": [ + "published_waveform_measurement_0 = data_store_client.get_measurement(waveform_measurement_ids[0])\n", + "published_waveform_measurement_1 = data_store_client.get_measurement(waveform_measurement_ids[1])\n", + "published_vector_measurement_0 = data_store_client.get_measurement(vector_measurement_ids[0])\n", + "published_vector_measurement_1 = data_store_client.get_measurement(vector_measurement_ids[1])\n", + "\n", + "read_back_waveform_measurement_0 = data_store_client.read_measurement_value(published_waveform_measurement_0, expected_type=AnalogWaveform)\n", + "read_back_waveform_measurement_1 = data_store_client.read_measurement_value(published_waveform_measurement_1, expected_type=AnalogWaveform)\n", + "read_back_vector_measurement_0 = data_store_client.read_measurement_value(published_vector_measurement_0, expected_type=Vector)\n", + "read_back_vector_measurement_1 = data_store_client.read_measurement_value(published_vector_measurement_1, expected_type=Vector)\n", + "\n", + "# The 'parametric_index' field of the PublishedMeasurement indicates\n", + "# which publish iteration the non-scalar measurement value corresponds to.\n", + "# In this example, we published two values for each (uniquely named) measurement,\n", + "# so the PublishedMeasurement corresponding to the first publish for each measurement \n", + "# has a 'parametric_index' of 0 and the PublishedMeasurement corresponding to the\n", + "# second publish for each measurement has a 'parametric_index' of 1.\n", + "print(\n", + " f\"{published_waveform_measurement_0.name} at Parametric Index \"\n", + " f\"{published_waveform_measurement_0.parametric_index}: \"\n", + " f\"{read_back_waveform_measurement_0.raw_data.tolist()}\"\n", + " )\n", + "print(\n", + " f\"{published_waveform_measurement_1.name} at Parametric Index \"\n", + " f\"{published_waveform_measurement_1.parametric_index}: \"\n", + " f\"{read_back_waveform_measurement_1.raw_data.tolist()}\"\n", + " )\n", + "\n", + "print(\n", + " f\"{published_vector_measurement_0.name} at Parametric Index \"\n", + " f\"{published_vector_measurement_0.parametric_index}: \"\n", + " f\"{read_back_vector_measurement_0}\"\n", + " )\n", + "print(\n", + " f\"{published_vector_measurement_1.name} at Parametric Index \"\n", + " f\"{published_vector_measurement_1.parametric_index}: \"\n", + " f\"{read_back_vector_measurement_1}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "6492dcaf", + "metadata": {}, + "source": [ + "## Clean-Up\n", + "\n", + "Close the `DataStoreClient` and tear down the example-specific context when done." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b80c18f6", + "metadata": {}, + "outputs": [], + "source": [ + "data_store_client.close()\n", + "\n", + "# Perform example-specific cleanup. This is not needed when writing production code.\n", + "data_store_context.close()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/poetry.lock b/poetry.lock index b33877a..dbd3a2c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.4.1 and should not be changed by hand. [[package]] name = "alabaster" @@ -1408,7 +1408,7 @@ fqdn = {version = "*", optional = true, markers = "extra == \"format-nongpl\""} idna = {version = "*", optional = true, markers = "extra == \"format-nongpl\""} isoduration = {version = "*", optional = true, markers = "extra == \"format-nongpl\""} jsonpointer = {version = ">1.13", optional = true, markers = "extra == \"format-nongpl\""} -jsonschema-specifications = ">=2023.03.6" +jsonschema-specifications = ">=2023.3.6" referencing = ">=0.28.4" rfc3339-validator = {version = "*", optional = true, markers = "extra == \"format-nongpl\""} rfc3986-validator = {version = ">0.1.0", optional = true, markers = "extra == \"format-nongpl\""} @@ -2447,36 +2447,36 @@ protobuf = ">=4.21" [[package]] name = "ni-measurements-data-v1-client" -version = "1.1.0.dev0" +version = "1.1.0.dev1" description = "gRPC Client for NI Data Store Service" optional = false python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "ni_measurements_data_v1_client-1.1.0.dev0-py3-none-any.whl", hash = "sha256:d41f93ff1584461ef45dd4ea25c3ceaf763da53c0a2cc5e48c64f675e4ba3c00"}, - {file = "ni_measurements_data_v1_client-1.1.0.dev0.tar.gz", hash = "sha256:3043ef784d6dec4f476f3065e781ce8a1f2db38900c3a0ef5d90d6c08a9fa466"}, + {file = "ni_measurements_data_v1_client-1.1.0.dev1-py3-none-any.whl", hash = "sha256:556ea5e16dc9121c678bf4047c4f911226028f0880b0f64a97408e7863d8426c"}, + {file = "ni_measurements_data_v1_client-1.1.0.dev1.tar.gz", hash = "sha256:b006257e5fccd7842cd3f93b787be5f84a8ccf2af9b76dbec537ae4d139d4226"}, ] [package.dependencies] ni-measurementlink-discovery-v1-client = ">=1.1.0" -ni-measurements-data-v1-proto = ">=1.1.0.dev0" +ni-measurements-data-v1-proto = ">=1.1.0.dev1" [[package]] name = "ni-measurements-data-v1-proto" -version = "1.1.0.dev0" +version = "1.1.0.dev1" description = "Protobuf data types and service stubs for NI data store gRPC APIs" optional = false python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "ni_measurements_data_v1_proto-1.1.0.dev0-py3-none-any.whl", hash = "sha256:c1e5ad669ab978c7202f58fac7eda2e19be2a6fcc4b07bc13f1904a5aad43809"}, - {file = "ni_measurements_data_v1_proto-1.1.0.dev0.tar.gz", hash = "sha256:99102d9e785031ce0797efa435fce975e9a0de8547b76079aaa7e35c871e7da4"}, + {file = "ni_measurements_data_v1_proto-1.1.0.dev1-py3-none-any.whl", hash = "sha256:a76ef8079f6d66d0ba3c577f20babacf8a15be196a343b5fa54b465c215de268"}, + {file = "ni_measurements_data_v1_proto-1.1.0.dev1.tar.gz", hash = "sha256:656220c42dcb94ef9d360bb19c1f852d215cd34299adb67fdbf782c7484ec53b"}, ] [package.dependencies] ni-datamonikers-v1-proto = ">=1.0.0" ni-measurements-metadata-v1-proto = ">=1.0.0" -ni-protobuf-types = ">=1.1.0" +ni-protobuf-types = ">=1.2.0.dev0" protobuf = ">=4.21" [[package]] @@ -2512,18 +2512,18 @@ protobuf = ">=4.21" [[package]] name = "ni-protobuf-types" -version = "1.1.0" +version = "1.2.0.dev0" description = "Protobuf data types for NI gRPC APIs" optional = false python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "ni_protobuf_types-1.1.0-py3-none-any.whl", hash = "sha256:0c21c096cf8577483dade081c571305fe8d4cc759ce2c780e7437129a375942c"}, - {file = "ni_protobuf_types-1.1.0.tar.gz", hash = "sha256:98f0583405e219f6e128133c2f6c033f03cd83ebd3ce8098ad74ab99b8a253c1"}, + {file = "ni_protobuf_types-1.2.0.dev0-py3-none-any.whl", hash = "sha256:9e06049582d8eb0b7412a3fdbb628c45f8adb1d1f26107959ce2eb469c1dc1c8"}, + {file = "ni_protobuf_types-1.2.0.dev0.tar.gz", hash = "sha256:6d9ce29fd577d9d9b6da69fe882b894d8db5303a1f4b5dfb584603234f746463"}, ] [package.dependencies] -nitypes = ">=1.1.0dev1" +nitypes = ">=1.1.0.dev1" protobuf = ">=4.21" [[package]] @@ -2542,8 +2542,8 @@ files = [ black = ">=23.1,<26.0" click = ">=7.1.2" flake8 = [ - {version = ">=5.0,<6.0", markers = "python_version >= \"3.7\" and python_version < \"3.12\""}, {version = ">=6.1,<7.0", markers = "python_version >= \"3.12\" and python_version < \"4.0\""}, + {version = ">=5.0,<6.0", markers = "python_version >= \"3.7\" and python_version < \"3.12\""}, ] flake8-black = ">=0.2.1" flake8-docstrings = ">=1.5.0" @@ -2552,8 +2552,8 @@ isort = ">=5.10" pathspec = ">=0.11.1" pep8-naming = ">=0.11.1" pycodestyle = [ - {version = ">=2.9,<3.0", markers = "python_version >= \"3.7\" and python_version < \"3.12\""}, {version = ">=2.11,<3.0", markers = "python_version >= \"3.12\" and python_version < \"4.0\""}, + {version = ">=2.9,<3.0", markers = "python_version >= \"3.7\" and python_version < \"3.12\""}, ] setuptools = "<82" toml = ">=0.10.1" @@ -2573,8 +2573,8 @@ files = [ [package.dependencies] hightime = ">=0.2.2" numpy = [ - {version = ">=1.22", markers = "python_version >= \"3.9\" and python_version < \"3.13\""}, {version = ">=2.1", markers = "python_version >= \"3.13\" and python_version < \"4.0\""}, + {version = ">=1.22", markers = "python_version >= \"3.9\" and python_version < \"3.13\""}, ] typing-extensions = ">=4.13.2" @@ -4469,4 +4469,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "7b3289fda614c93cbdfd701d40173d131938304eb1870c23d40bb934fdb723b1" +content-hash = "8d65fe6510964e28aefc1f5b3337ce929cf90ad21f433d6738e64521b0dc8827" diff --git a/pyproject.toml b/pyproject.toml index 3824786..0631e41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ requires-poetry = '>=2.1,<3.0' [tool.poetry.dependencies] python = "^3.10" protobuf = {version=">=4.21"} -ni-measurements-data-v1-client = { version = ">=1.1.0dev0", allow-prereleases = true } +ni-measurements-data-v1-client = { version = ">=1.1.0dev1", allow-prereleases = true } ni-measurements-metadata-v1-client = { version = ">=1.0.0" } ni-protobuf-types = { version = ">=1.1.0" } hightime = { version = ">=1.0.0" } diff --git a/src/ni/datastore/data/_data_store_client.py b/src/ni/datastore/data/_data_store_client.py index 3705a38..c0ad9ed 100644 --- a/src/ni/datastore/data/_data_store_client.py +++ b/src/ni/datastore/data/_data_store_client.py @@ -295,7 +295,7 @@ def publish_measurement_batch( software_item_ids: Iterable[str] = tuple(), notes: str = "", ) -> Sequence[str]: - """Publish multiple scalar measurements at once for parametric sweeps. + """Publish multiple measurements at once for parametric sweeps. Args: name: The name used for associating/grouping @@ -303,7 +303,7 @@ def publish_measurement_batch( iterations. For example, "Temperature" can be used for associating temperature readings across multiple iterations. - values: The values of the (scalar) measurement being published + values: The values of the measurement being published across N iterations. step_id: The ID of the step associated with this measurement. This @@ -339,10 +339,9 @@ def publish_measurement_batch( notes: Any notes to be associated with the published measurements. Returns: - Sequence[str]: The ids of the published measurement ids. - NOTE: Using a Sequence is for future flexibility. - This sequence will currently always have a single measurement id - returned. + Sequence[str]: The IDs of the corresponding PublishedMeasurements. A single + ID will be returned when publishing scalar measurement values. + N IDs will be returned when publishing (N) non-scalar measurement values. """ publish_request = PublishMeasurementBatchRequest( name=name, diff --git a/src/ni/datastore/data/_grpc_conversion.py b/src/ni/datastore/data/_grpc_conversion.py index 2fcab11..f3ca4b5 100644 --- a/src/ni/datastore/data/_grpc_conversion.py +++ b/src/ni/datastore/data/_grpc_conversion.py @@ -4,7 +4,8 @@ import datetime as std_datetime import logging -from typing import Iterable, cast +from itertools import chain +from typing import Any, Callable, Iterable, Sequence, cast import hightime as ht import numpy as np @@ -49,6 +50,129 @@ _logger = logging.getLogger(__name__) +def _copy_batch_values( + repeated_field: Any, + batch_values: Iterable[object], + is_supported: Callable[[object], bool], + convert_value: Callable[[Any], Any], + error_message: str, +) -> None: + for value in batch_values: + if not is_supported(value): + raise TypeError(error_message) + repeated_field.add().CopyFrom(convert_value(value)) + + +def _populate_vector_batch_values( + publish_request: PublishMeasurementBatchRequest, values: Iterable[object] +) -> None: + _copy_batch_values( + publish_request.vector_values.vectors, + values, + lambda value: isinstance(value, Vector), + vector_to_protobuf, + "Unsupported iterable: all values must be Vector.", + ) + + +def _populate_analog_waveform_batch_values( + publish_request: PublishMeasurementBatchRequest, + first_value: AnalogWaveform[Any], + values: Iterable[object], +) -> None: + if first_value.dtype == np.float64: + _copy_batch_values( + publish_request.double_analog_waveform_values.waveforms, + values, + lambda value: isinstance(value, AnalogWaveform) and value.dtype == np.float64, + float64_analog_waveform_to_protobuf, + "Unsupported iterable: all values must be float64 AnalogWaveform.", + ) + return + elif first_value.dtype == np.int16: + _copy_batch_values( + publish_request.i16_analog_waveform_values.waveforms, + values, + lambda value: isinstance(value, AnalogWaveform) and value.dtype == np.int16, + int16_analog_waveform_to_protobuf, + "Unsupported iterable: all values must be int16 AnalogWaveform.", + ) + return + raise TypeError(f"Unsupported AnalogWaveform dtype: {first_value.dtype}") + + +def _populate_complex_waveform_batch_values( + publish_request: PublishMeasurementBatchRequest, + first_value: ComplexWaveform[Any], + values: Iterable[object], +) -> None: + if first_value.dtype == np.complex128: + _copy_batch_values( + publish_request.double_complex_waveform_values.waveforms, + values, + lambda value: isinstance(value, ComplexWaveform) and value.dtype == np.complex128, + float64_complex_waveform_to_protobuf, + "Unsupported iterable: all values must be complex128 ComplexWaveform.", + ) + return + if first_value.dtype == ComplexInt32DType: + _copy_batch_values( + publish_request.i16_complex_waveform_values.waveforms, + values, + lambda value: isinstance(value, ComplexWaveform) and value.dtype == ComplexInt32DType, + int16_complex_waveform_to_protobuf, + "Unsupported iterable: all values must be ComplexWaveform with ComplexInt32DType.", + ) + return + raise TypeError(f"Unsupported ComplexWaveform dtype: {first_value.dtype}") + + +def _populate_spectrum_batch_values( + publish_request: PublishMeasurementBatchRequest, + first_value: Spectrum[Any], + values: Iterable[object], +) -> None: + if first_value.dtype != np.float64: + raise TypeError(f"Unsupported Spectrum dtype: {first_value.dtype}") + + _copy_batch_values( + publish_request.double_spectrum_values.waveforms, + values, + lambda value: isinstance(value, Spectrum) and value.dtype == np.float64, + float64_spectrum_to_protobuf, + "Unsupported iterable: all values must be float64 Spectrum.", + ) + + +def _populate_digital_waveform_batch_values( + publish_request: PublishMeasurementBatchRequest, values: Iterable[object] +) -> None: + _copy_batch_values( + publish_request.digital_waveform_values.waveforms, + values, + lambda value: isinstance(value, DigitalWaveform), + digital_waveform_to_protobuf, + "Unsupported iterable: all values must be DigitalWaveform.", + ) + + +def _populate_xydata_batch_values( + publish_request: PublishMeasurementBatchRequest, + first_value: XYData[Any], + values: Iterable[object], +) -> None: + if first_value.dtype != np.float64: + raise TypeError(f"Unsupported XYData dtype: {first_value.dtype}") + + _copy_batch_values( + publish_request.x_y_data_values.x_y_data, + values, + lambda value: isinstance(value, XYData) and value.dtype == np.float64, + float64_xydata_to_protobuf, + "Unsupported iterable: all values must be float64 XYData.", + ) + + def populate_publish_condition_request_value( publish_request: PublishConditionRequest, value: object ) -> None: @@ -78,11 +202,16 @@ def populate_publish_condition_batch_request_values( elif isinstance(values, Iterable): if not values: raise ValueError("Cannot publish an empty Iterable.") + + # Vector initialization requires the Iterable to be iterated over multiple times. + # We convert the Iterable to a list if we don't know that the Iterable type + # supports multiple iterations. + condition_values = values if isinstance(values, Sequence) else list(values) try: - vector = Vector(values) + vector = Vector(condition_values) except (TypeError, ValueError): raise TypeError( - f"Unsupported iterable: {values}. Subtype must be bool, float, int, or string." + f"Unsupported iterable: {condition_values}. Subtype must be bool, float, int, or string." ) publish_request.scalar_values.CopyFrom(vector_to_protobuf(vector)) @@ -162,16 +291,34 @@ def populate_publish_measurement_batch_request_values( if isinstance(values, Vector): publish_request.scalar_values.CopyFrom(vector_to_protobuf(values)) elif isinstance(values, Iterable): - if not values: - raise ValueError("Cannot publish an empty Iterable.") + values_iterator = iter(values) try: - vector = Vector(values) - except (TypeError, ValueError): - raise TypeError( - f"Unsupported iterable: {values}. Subtype must be bool, float, int, or string." - ) + first_value = next(values_iterator) + except StopIteration as exc: + raise ValueError("Cannot publish an empty Iterable.") from exc - publish_request.scalar_values.CopyFrom(vector_to_protobuf(vector)) + all_values = chain([first_value], values_iterator) + if isinstance(first_value, Vector): + _populate_vector_batch_values(publish_request, all_values) + elif isinstance(first_value, AnalogWaveform): + _populate_analog_waveform_batch_values(publish_request, first_value, all_values) + elif isinstance(first_value, ComplexWaveform): + _populate_complex_waveform_batch_values(publish_request, first_value, all_values) + elif isinstance(first_value, Spectrum): + _populate_spectrum_batch_values(publish_request, first_value, all_values) + elif isinstance(first_value, DigitalWaveform): + _populate_digital_waveform_batch_values(publish_request, all_values) + elif isinstance(first_value, XYData): + _populate_xydata_batch_values(publish_request, first_value, all_values) + else: + try: + vector = Vector(cast(Iterable[bool | int | float | str], list(all_values))) + except (TypeError, ValueError): + raise TypeError( + f"Unsupported iterable. Subtype must be bool, float, int, string, Vector, " + "AnalogWaveform, ComplexWaveform, Spectrum, DigitalWaveform, or XYData." + ) + publish_request.scalar_values.CopyFrom(vector_to_protobuf(vector)) else: raise TypeError( f"Unsupported measurement values type: {type(values)}. Please consult the documentation." diff --git a/tests/acceptance/test_publish_measurement_batch_and_read_data.py b/tests/acceptance/test_publish_measurement_batch_and_read_data.py index d148a51..cacc131 100644 --- a/tests/acceptance/test_publish_measurement_batch_and_read_data.py +++ b/tests/acceptance/test_publish_measurement_batch_and_read_data.py @@ -1,20 +1,22 @@ """Acceptance tests that publish various batch measurement values then reads the data back.""" +import numpy as np from ni.datastore.data import ( DataStoreClient, Step, TestResult, ) from nitypes.vector import Vector +from nitypes.waveform import AnalogWaveform from utilities import DataStoreContext -def test___publish_float___read_measurement_value_returns_vector( +def test___publish_batch_floats___read_measurement_value_returns_vector( acceptance_test_context: DataStoreContext, ) -> None: with DataStoreClient() as data_store_client: # Create TestResult metadata - test_result_name = "python batch publish float acceptance test" + test_result_name = "python publish batch floats acceptance test" test_result = TestResult(name=test_result_name) test_result_id = data_store_client.create_test_result(test_result) @@ -22,7 +24,7 @@ def test___publish_float___read_measurement_value_returns_vector( step = Step(name="Initial step", test_result_id=test_result_id) step_id = data_store_client.create_step(step) published_measurement_ids = data_store_client.publish_measurement_batch( - name="python batch publish float", + name="Test measurement", values=[1.0, 2.0, 3.0, 4.0], step_id=step_id, ) @@ -45,7 +47,7 @@ def test___publish_batch_vector___read_measurement_value_returns_vector( ) -> None: with DataStoreClient() as data_store_client: # Create TestResult metadata - test_result_name = "python publish scalar acceptance test" + test_result_name = "python publish batch Vector acceptance test" test_result = TestResult(name=test_result_name) test_result_id = data_store_client.create_test_result(test_result) @@ -56,7 +58,7 @@ def test___publish_batch_vector___read_measurement_value_returns_vector( step = Step(name="Initial step", test_result_id=test_result_id) step_id = data_store_client.create_step(step) published_measurement_ids = data_store_client.publish_measurement_batch( - name="python publish scalar", + name="Test measurement", values=expected_vector, step_id=step_id, ) @@ -71,3 +73,69 @@ def test___publish_batch_vector___read_measurement_value_returns_vector( published_measurement, expected_type=Vector ) assert vector == expected_vector + + +def test___publish_batch_double_analog_waveforms___read_measurement_value_returns_each_analog_waveform( + acceptance_test_context: DataStoreContext, +) -> None: + with DataStoreClient() as data_store_client: + test_result_name = "python publish batch AnalogWaveforms acceptance test" + test_result = TestResult(name=test_result_name) + test_result_id = data_store_client.create_test_result(test_result) + expected_waveforms = [ + AnalogWaveform(sample_count=3, raw_data=np.array([1.0, 2.0, 3.0], dtype=np.float64)), + AnalogWaveform(sample_count=3, raw_data=np.array([4.0, 5.0, 6.0], dtype=np.float64)), + ] + step = Step(name="Initial step", test_result_id=test_result_id) + step_id = data_store_client.create_step(step) + + published_measurement_ids = data_store_client.publish_measurement_batch( + name="Test measurement", + values=expected_waveforms, + step_id=step_id, + ) + + assert len(published_measurement_ids) == 2 + published_measurement_one = data_store_client.get_measurement(published_measurement_ids[0]) + published_measurement_two = data_store_client.get_measurement(published_measurement_ids[1]) + published_waveform_one = data_store_client.read_measurement_value( + published_measurement_one, expected_type=AnalogWaveform + ) + published_waveform_two = data_store_client.read_measurement_value( + published_measurement_two, expected_type=AnalogWaveform + ) + assert published_waveform_one == expected_waveforms[0] + assert published_waveform_two == expected_waveforms[1] + + +def test___publish_batch_vectors___read_measurement_value_returns_each_vector( + acceptance_test_context: DataStoreContext, +) -> None: + with DataStoreClient() as data_store_client: + test_result_name = "python publish batch Vectors acceptance test" + test_result = TestResult(name=test_result_name) + test_result_id = data_store_client.create_test_result(test_result) + expected_vectors = [ + Vector(values=[1, 2, 3], units="Volts"), + Vector(values=[4, 5, 6], units="Volts"), + ] + step = Step(name="Initial step", test_result_id=test_result_id) + step_id = data_store_client.create_step(step) + + published_measurement_ids = data_store_client.publish_measurement_batch( + name="Test measurement", + values=expected_vectors, + step_id=step_id, + ) + + assert len(published_measurement_ids) == 2 + published_measurement_one = data_store_client.get_measurement(published_measurement_ids[0]) + published_measurement_two = data_store_client.get_measurement(published_measurement_ids[1]) + published_vector_one = data_store_client.read_measurement_value( + published_measurement_one, expected_type=Vector + ) + published_vector_two = data_store_client.read_measurement_value( + published_measurement_two, expected_type=Vector + ) + assert published_vector_one == expected_vectors[0] + assert published_vector_two == expected_vectors[1] diff --git a/tests/unit/data/test_grpc_conversion.py b/tests/unit/data/test_grpc_conversion.py index c8893bb..f5b930b 100644 --- a/tests/unit/data/test_grpc_conversion.py +++ b/tests/unit/data/test_grpc_conversion.py @@ -1,3 +1,5 @@ +from typing import Any, Iterable + import numpy as np import pytest from ni.datastore.data._grpc_conversion import ( @@ -15,8 +17,11 @@ from ni.protobuf.types import ( scalar_pb2, vector_pb2, + vector_wrappers_pb2, waveform_pb2, + waveform_wrappers_pb2, xydata_pb2, + xydata_wrappers_pb2, ) from nitypes.complex import ComplexInt32DType from nitypes.scalar import Scalar @@ -61,7 +66,7 @@ def test___python_scalar_object___populate_condition___condition_updated_correct # ======================================================== # Populate Condition Batch # ======================================================== -def test___python_vector_object___populate_batch_condition___condition_updated_correctly() -> None: +def test___python_vector_object___populate_condition_batch___condition_updated_correctly() -> None: vector_obj = Vector([1.0, 2.0, 3.0], "amps") request = PublishConditionBatchRequest() populate_publish_condition_batch_request_values(request, vector_obj) @@ -71,6 +76,21 @@ def test___python_vector_object___populate_batch_condition___condition_updated_c assert request.scalar_values.attributes["NI_UnitDescription"].string_value == "amps" +def test___python_scalar_generator_iterable___populate_condition_batch___condition_updated_correctly() -> ( + None +): + def _values() -> Iterable[float]: + yield 1.5 + yield 2.5 + yield 3.5 + + request = PublishConditionBatchRequest() + populate_publish_condition_batch_request_values(request, _values()) + + assert isinstance(request.scalar_values, vector_pb2.Vector) + assert list(request.scalar_values.double_array.values) == [1.5, 2.5, 3.5] + + # ======================================================== # Populate Measurement # ======================================================== @@ -208,13 +228,487 @@ def test___python_float64_xydata___populate_measurement___measurement_updated_co # ======================================================== # Populate Measurement Batch # ======================================================== -def test___python_vector_object___populate_measurement_batch___condition_updated_correctly() -> ( +def _assert_scalar_values( + request: PublishMeasurementBatchRequest, attribute_name: str, expected_values: list[object] +) -> None: + assert isinstance(request.scalar_values, vector_pb2.Vector) + assert list(getattr(request.scalar_values, attribute_name).values) == expected_values + + +@pytest.mark.parametrize( + "values, attribute_name, expected_unit", + [ + (Vector([1.5, 2.5, 3.5], "amps"), "double_array", "amps"), + (Vector([1, 2, 3], "volts"), "sint32_array", "volts"), + (Vector([True, False, True], "state"), "bool_array", "state"), + (Vector(["one", "two", "three"], "labels"), "string_array", "labels"), + ], +) +def test___python_vector_object___populate_measurement_batch___measurement_updated_correctly( + values: Vector[Any], attribute_name: str, expected_unit: str +) -> None: + request = PublishMeasurementBatchRequest() + populate_publish_measurement_batch_request_values(request, values) + + _assert_scalar_values(request, attribute_name, list(values)) + assert request.scalar_values.attributes["NI_UnitDescription"].string_value == expected_unit + + +@pytest.mark.parametrize( + "values, attribute_name", + [ + ([1.5, 2.5, 3.5], "double_array"), + ([1, 2, 3], "sint32_array"), + ([True, False, True], "bool_array"), + (["one", "two", "three"], "string_array"), + ], +) +def test___python_scalar_iterable___populate_measurement_batch___measurement_updated_correctly( + values: list[object], attribute_name: str +) -> None: + request = PublishMeasurementBatchRequest() + populate_publish_measurement_batch_request_values(request, values) + + _assert_scalar_values(request, attribute_name, values) + + +def test___python_vector_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( None ): - vector_obj = Vector([1.0, 2.0, 3.0], "amps") + values = [Vector([1.0, 2.0]), Vector([3.0, 4.0])] request = PublishMeasurementBatchRequest() - populate_publish_measurement_batch_request_values(request, vector_obj) - assert isinstance(request.scalar_values, vector_pb2.Vector) - assert list(request.scalar_values.double_array.values) == [1.0, 2.0, 3.0] - assert request.scalar_values.attributes["NI_UnitDescription"].string_value == "amps" + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance(request.vector_values, vector_wrappers_pb2.VectorArrayValue) + assert len(request.vector_values.vectors) == 2 + assert list(request.vector_values.vectors[0].double_array.values) == [1.0, 2.0] + assert list(request.vector_values.vectors[1].double_array.values) == [3.0, 4.0] + + +def test___python_float64_analog_waveform_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + AnalogWaveform(sample_count=2, raw_data=np.array([1.25, -2.5], dtype=np.float64)), + AnalogWaveform(sample_count=3, raw_data=np.array([3.5, 4.75, -6.0], dtype=np.float64)), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance( + request.double_analog_waveform_values, waveform_wrappers_pb2.DoubleAnalogWaveformArrayValue + ) + assert len(request.double_analog_waveform_values.waveforms) == 2 + assert list(request.double_analog_waveform_values.waveforms[0].y_data) == [1.25, -2.5] + assert list(request.double_analog_waveform_values.waveforms[1].y_data) == [3.5, 4.75, -6.0] + + +def test___python_int16_analog_waveform_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + AnalogWaveform(sample_count=2, raw_data=np.array([12, -3], dtype=np.int16)), + AnalogWaveform(sample_count=3, raw_data=np.array([7, 0, -8], dtype=np.int16)), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance( + request.i16_analog_waveform_values, waveform_wrappers_pb2.I16AnalogWaveformArrayValue + ) + assert len(request.i16_analog_waveform_values.waveforms) == 2 + assert list(request.i16_analog_waveform_values.waveforms[0].y_data) == [12, -3] + assert list(request.i16_analog_waveform_values.waveforms[1].y_data) == [7, 0, -8] + + +def test___python_float64_complex_waveform_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + ComplexWaveform( + sample_count=2, raw_data=np.array([1.0 + 2.0j, -3.0 + 4.5j], dtype=np.complex128) + ), + ComplexWaveform( + sample_count=3, + raw_data=np.array([0.5 - 1.5j, 2.25 + 0.75j, -4.0 - 2.0j], dtype=np.complex128), + ), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance( + request.double_complex_waveform_values, + waveform_wrappers_pb2.DoubleComplexWaveformArrayValue, + ) + assert len(request.double_complex_waveform_values.waveforms) == 2 + assert list(request.double_complex_waveform_values.waveforms[0].y_data) == [1.0, 2.0, -3.0, 4.5] + assert list(request.double_complex_waveform_values.waveforms[1].y_data) == [ + 0.5, + -1.5, + 2.25, + 0.75, + -4.0, + -2.0, + ] + + +def test___python_int16_complex_waveform_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + ComplexWaveform( + sample_count=2, + raw_data=np.array([(11, -2), (5, 9)], dtype=ComplexInt32DType), + ), + ComplexWaveform( + sample_count=3, + raw_data=np.array([(-7, 4), (0, -6), (8, 3)], dtype=ComplexInt32DType), + ), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance( + request.i16_complex_waveform_values, waveform_wrappers_pb2.I16ComplexWaveformArrayValue + ) + assert len(request.i16_complex_waveform_values.waveforms) == 2 + assert list(request.i16_complex_waveform_values.waveforms[0].y_data) == [11, -2, 5, 9] + assert list(request.i16_complex_waveform_values.waveforms[1].y_data) == [-7, 4, 0, -6, 8, 3] + + +def test___python_float64_spectrum_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + Spectrum.from_array_1d(np.array([1.0, 2.0])), + Spectrum.from_array_1d(np.array([3.0, 4.0])), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance( + request.double_spectrum_values, waveform_wrappers_pb2.DoubleSpectrumArrayValue + ) + assert len(request.double_spectrum_values.waveforms) == 2 + assert list(request.double_spectrum_values.waveforms[0].data) == [1.0, 2.0] + assert list(request.double_spectrum_values.waveforms[1].data) == [3.0, 4.0] + + +def test___python_uint8_digital_waveform_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + DigitalWaveform.from_lines([1], np.uint8), + DigitalWaveform.from_lines([0], np.uint8), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance( + request.digital_waveform_values, waveform_wrappers_pb2.DigitalWaveformArrayValue + ) + assert len(request.digital_waveform_values.waveforms) == 2 + assert request.digital_waveform_values.waveforms[0].y_data == b"\x01" + assert request.digital_waveform_values.waveforms[1].y_data == b"\x00" + + +def test___python_float64_xydata_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + values = [ + XYData.from_arrays_1d([1.0], [2.0], np.float64), + XYData.from_arrays_1d([3.0], [4.0], np.float64), + ] + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, values) + + assert isinstance(request.x_y_data_values, xydata_wrappers_pb2.DoubleXYDataArrayValue) + assert len(request.x_y_data_values.x_y_data) == 2 + assert list(request.x_y_data_values.x_y_data[0].x_data) == [1.0] + assert list(request.x_y_data_values.x_y_data[0].y_data) == [2.0] + assert list(request.x_y_data_values.x_y_data[1].x_data) == [3.0] + assert list(request.x_y_data_values.x_y_data[1].y_data) == [4.0] + + +def test___python_scalar_generator_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + def _values() -> Iterable[float]: + yield 1.5 + yield 2.5 + yield 3.5 + + request = PublishMeasurementBatchRequest() + populate_publish_measurement_batch_request_values(request, _values()) + + _assert_scalar_values(request, "double_array", [1.5, 2.5, 3.5]) + + +def test___python_non_scalar_generator_iterable___populate_measurement_batch___measurement_updated_correctly() -> ( + None +): + def _values() -> Iterable[AnalogWaveform[np.float64]]: + yield AnalogWaveform(sample_count=2, raw_data=np.array([1.25, -2.5], dtype=np.float64)) + yield AnalogWaveform(sample_count=3, raw_data=np.array([3.5, 4.75, -6.0], dtype=np.float64)) + + request = PublishMeasurementBatchRequest() + + populate_publish_measurement_batch_request_values(request, _values()) + + assert isinstance( + request.double_analog_waveform_values, waveform_wrappers_pb2.DoubleAnalogWaveformArrayValue + ) + assert len(request.double_analog_waveform_values.waveforms) == 2 + assert list(request.double_analog_waveform_values.waveforms[0].y_data) == [1.25, -2.5] + assert list(request.double_analog_waveform_values.waveforms[1].y_data) == [3.5, 4.75, -6.0] + + +@pytest.mark.parametrize( + "values, error_message", + [ + pytest.param( + [ + Vector([1.0, 2.0]), + AnalogWaveform(sample_count=2, raw_data=np.array([1.0, 2.0])), + ], + "Unsupported iterable: all values must be Vector.", + id="vector", + ), + pytest.param( + [ + AnalogWaveform(sample_count=2, raw_data=np.array([1.25, -2.5], dtype=np.float64)), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be float64 AnalogWaveform.", + id="float64_analog_waveform", + ), + pytest.param( + [ + AnalogWaveform(sample_count=2, raw_data=np.array([12, -3], dtype=np.int16)), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be int16 AnalogWaveform.", + id="int16_analog_waveform", + ), + pytest.param( + [ + ComplexWaveform( + sample_count=2, + raw_data=np.array([1.0 + 2.0j, -3.0 + 4.5j], dtype=np.complex128), + ), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be complex128 ComplexWaveform.", + id="float64_complex_waveform", + ), + pytest.param( + [ + ComplexWaveform( + sample_count=2, + raw_data=np.array([(11, -2), (5, 9)], dtype=ComplexInt32DType), + ), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be ComplexWaveform with ComplexInt32DType.", + id="int16_complex_waveform", + ), + pytest.param( + [ + Spectrum.from_array_1d(np.array([1.0, 2.0])), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be float64 Spectrum.", + id="spectrum", + ), + pytest.param( + [ + DigitalWaveform.from_lines([1], np.uint8), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be DigitalWaveform.", + id="digital_waveform", + ), + pytest.param( + [ + XYData.from_arrays_1d([1.0], [2.0], np.float64), + Vector([1.0, 2.0]), + ], + "Unsupported iterable: all values must be float64 XYData.", + id="xydata", + ), + ], +) +def test___python_iterable_with_mismatched_second_element___populate_measurement_batch___raises_error( + values: list[object], error_message: str +) -> None: + request = PublishMeasurementBatchRequest() + + with pytest.raises(TypeError, match=error_message): + populate_publish_measurement_batch_request_values(request, values) + + +@pytest.mark.parametrize( + "values, error_message", + [ + pytest.param( + [ + AnalogWaveform(sample_count=2, raw_data=np.array([1.25, -2.5], dtype=np.float64)), + AnalogWaveform(sample_count=3, raw_data=np.array([7, 0, -8], dtype=np.int16)), + ], + "Unsupported iterable: all values must be float64 AnalogWaveform.", + id="float64_analog_waveform", + ), + pytest.param( + [ + AnalogWaveform(sample_count=2, raw_data=np.array([12, -3], dtype=np.int16)), + AnalogWaveform( + sample_count=3, raw_data=np.array([3.5, 4.75, -6.0], dtype=np.float64) + ), + ], + "Unsupported iterable: all values must be int16 AnalogWaveform.", + id="int16_analog_waveform", + ), + pytest.param( + [ + ComplexWaveform( + sample_count=2, + raw_data=np.array([1.0 + 2.0j, -3.0 + 4.5j], dtype=np.complex128), + ), + ComplexWaveform( + sample_count=3, + raw_data=np.array([(-7, 4), (0, -6), (8, 3)], dtype=ComplexInt32DType), + ), + ], + "Unsupported iterable: all values must be complex128 ComplexWaveform.", + id="float64_complex_waveform", + ), + pytest.param( + [ + ComplexWaveform( + sample_count=2, + raw_data=np.array([(11, -2), (5, 9)], dtype=ComplexInt32DType), + ), + ComplexWaveform( + sample_count=3, + raw_data=np.array([0.5 - 1.5j, 2.25 + 0.75j, -4.0 - 2.0j], dtype=np.complex128), + ), + ], + "Unsupported iterable: all values must be ComplexWaveform with ComplexInt32DType.", + id="int16_complex_waveform", + ), + pytest.param( + [ + Spectrum.from_array_1d(np.array([1.0, 2.0])), + Spectrum.from_array_1d(np.array([3.0, 4.0], dtype=np.float32)), + ], + "Unsupported iterable: all values must be float64 Spectrum.", + id="spectrum", + ), + pytest.param( + [ + XYData.from_arrays_1d([1.0], [2.0], np.float64), + XYData.from_arrays_1d([3.0], [4.0], np.float32), + ], + "Unsupported iterable: all values must be float64 XYData.", + id="xydata", + ), + ], +) +def test___python_iterable_with_mismatched_second_dtype___populate_measurement_batch___raises_error( + values: list[object], error_message: str +) -> None: + request = PublishMeasurementBatchRequest() + + with pytest.raises(TypeError, match=error_message): + populate_publish_measurement_batch_request_values(request, values) + + +@pytest.mark.parametrize( + "values, error_message", + [ + pytest.param( + [ + AnalogWaveform(sample_count=2, raw_data=np.array([1.25, -2.5], dtype=np.float32)), + AnalogWaveform( + sample_count=3, raw_data=np.array([3.5, 4.75, -6.0], dtype=np.float32) + ), + ], + "Unsupported AnalogWaveform dtype", + id="analog_waveform", + ), + pytest.param( + [ + ComplexWaveform( + sample_count=2, + raw_data=np.array([1.0 + 2.0j, -3.0 + 4.5j], dtype=np.complex64), + ), + ComplexWaveform( + sample_count=3, + raw_data=np.array([0.5 - 1.5j, 2.25 + 0.75j, -4.0 - 2.0j], dtype=np.complex64), + ), + ], + "Unsupported ComplexWaveform dtype", + id="complex_waveform", + ), + pytest.param( + [ + Spectrum.from_array_1d(np.array([1.0, 2.0], dtype=np.float32)), + Spectrum.from_array_1d(np.array([3.0, 4.0], dtype=np.float32)), + ], + "Unsupported Spectrum dtype", + id="spectrum", + ), + pytest.param( + [ + XYData.from_arrays_1d([1.0], [2.0], np.float32), + XYData.from_arrays_1d([3.0], [4.0], np.float32), + ], + "Unsupported XYData dtype", + id="xydata", + ), + ], +) +def test___python_unsupported_dtype_iterable___populate_measurement_batch___raises_error( + values: list[object], error_message: str +) -> None: + request = PublishMeasurementBatchRequest() + + with pytest.raises(TypeError, match=error_message): + populate_publish_measurement_batch_request_values(request, values) + + +def test___empty_iterable___populate_measurement_batch___raises_error() -> None: + request = PublishMeasurementBatchRequest() + + with pytest.raises(ValueError, match="Cannot publish an empty Iterable."): + populate_publish_measurement_batch_request_values(request, []) + + +def test___python_unsupported_iterable___populate_measurement_batch___raises_error() -> None: + values = [object(), object()] + request = PublishMeasurementBatchRequest() + + with pytest.raises( + TypeError, + match="Unsupported iterable. Subtype must be", + ): + populate_publish_measurement_batch_request_values(request, values) + + +def test___python_non_iterable___populate_measurement_batch___raises_error() -> None: + values = 42 + request = PublishMeasurementBatchRequest() + + with pytest.raises( + TypeError, + match="Unsupported measurement values type", + ): + populate_publish_measurement_batch_request_values(request, values) diff --git a/tests/unit/data/test_publish_measurement.py b/tests/unit/data/test_publish_measurement.py index ac3742e..580918d 100644 --- a/tests/unit/data/test_publish_measurement.py +++ b/tests/unit/data/test_publish_measurement.py @@ -420,7 +420,7 @@ def test___unsupported_list___publish_measurement_batch___raises_type_error( step_id="step_id", ) - assert exc.value.args[0].startswith("Unsupported iterable:") + assert exc.value.args[0].startswith("Unsupported iterable.") def test___empty_list___publish_measurement_batch___raises_type_error(