|
6 | 6 | import subprocess |
7 | 7 | from abc import ABC, abstractmethod |
8 | 8 | from dataclasses import dataclass |
| 9 | +from subprocess import run |
9 | 10 | from typing import Dict, Iterable, List, Optional, Tuple |
10 | 11 |
|
11 | 12 | import psutil |
@@ -487,3 +488,99 @@ def from_utils( |
487 | 488 | logger.warning("Could not read AppleSiliconChip model.") |
488 | 489 |
|
489 | 490 | return cls(output_dir=output_dir, model=model, chip_part=chip_part) |
| 491 | + |
| 492 | + |
| 493 | +@dataclass |
| 494 | +class Raspberry(BaseHardware): |
| 495 | + def __init__( |
| 496 | + self, |
| 497 | + output_dir: str, |
| 498 | + model: str, |
| 499 | + chip_part: str = "CPU", |
| 500 | + ): |
| 501 | + if chip_part == "CPU": |
| 502 | + self.WANTED_COMPONENTS = ( |
| 503 | + "3V7_WL_SW", |
| 504 | + "3V3_SYS", |
| 505 | + "1V8_SYS", |
| 506 | + "1V1_SYS", |
| 507 | + "0V8_SW", |
| 508 | + "VDD_CORE", |
| 509 | + "3V3_DAC", |
| 510 | + "3V3_ADC", |
| 511 | + "0V8_AON", |
| 512 | + ) |
| 513 | + elif chip_part == "RAM": |
| 514 | + self.WANTED_COMPONENTS = ("DDR_VDD2", "DDR_VDD2", "DDR_VDDQ") |
| 515 | + else: |
| 516 | + raise Exception("Unknown chip part", chip_part) |
| 517 | + |
| 518 | + self._output_dir = output_dir |
| 519 | + self._model = model |
| 520 | + self.chip_part = chip_part |
| 521 | + |
| 522 | + def __repr__(self) -> str: |
| 523 | + return f"Raspberry ({self._model} > {self.chip_part})" |
| 524 | + |
| 525 | + def _get_power(self) -> Power: |
| 526 | + """ """ |
| 527 | + measure: Dict = self.get_measure() |
| 528 | + return Power.from_watts(measure["power"]) |
| 529 | + |
| 530 | + def _get_energy(self, delay: Time) -> Energy: |
| 531 | + """ |
| 532 | + Get Chip part energy deltas |
| 533 | + Args: |
| 534 | + chip_part (str): Chip part to get power from (Processor, GPU, etc.) |
| 535 | + :return: energy in kWh |
| 536 | + """ |
| 537 | + energy = Energy.from_power_and_time( |
| 538 | + power=self._get_power(), time=Time.from_seconds(delay) |
| 539 | + ) |
| 540 | + return energy |
| 541 | + |
| 542 | + def total_power(self) -> Power: |
| 543 | + return self._get_power() |
| 544 | + |
| 545 | + def start(self): |
| 546 | + # ... |
| 547 | + ... |
| 548 | + |
| 549 | + def get_model(self): |
| 550 | + return self._model |
| 551 | + |
| 552 | + def get_measure(self): |
| 553 | + components = {} |
| 554 | + res = run(["vcgencmd", "pmic_read_adc"], capture_output=True) |
| 555 | + lines = res.stdout.decode("utf-8").splitlines() |
| 556 | + for line in lines: |
| 557 | + res = re.search( |
| 558 | + "([A-Z_0-9]+)_[VA] (current|volt)\(([0-9]+)\)=([0-9.]+)", # noqa: W605 |
| 559 | + line, |
| 560 | + ) |
| 561 | + component_name, measure_type, idx, value = res.groups() |
| 562 | + component = components[component_name] = components.get(component_name, {}) |
| 563 | + component[measure_type] = float(value) |
| 564 | + pi_power = 0 |
| 565 | + |
| 566 | + for component_name, component in components.items(): |
| 567 | + try: |
| 568 | + component["power"] = component["volt"] * component["current"] |
| 569 | + if component_name in self.WANTED_COMPONENTS: |
| 570 | + pi_power += component["power"] |
| 571 | + except Exception: |
| 572 | + ... |
| 573 | + return { |
| 574 | + "power": pi_power, |
| 575 | + } |
| 576 | + |
| 577 | + @classmethod |
| 578 | + def from_utils( |
| 579 | + cls, output_dir: str, model: Optional[str] = None, chip_part: str = "CPU" |
| 580 | + ) -> "Raspberry": |
| 581 | + if model is None: |
| 582 | + model = detect_cpu_model() |
| 583 | + if model is None: |
| 584 | + logger.warning("Could not read Raspberry model.") |
| 585 | + |
| 586 | + return cls(output_dir=output_dir, model=model, chip_part=chip_part) |
0 commit comments