From 9e8f0814ab417843f8cdcb401dd11a9dc909f317 Mon Sep 17 00:00:00 2001 From: Ben Meadors Date: Mon, 26 Aug 2024 16:34:28 -0500 Subject: [PATCH] Stuff --- bin/readprops.py | 2 +- test/end2end/flash.py | 29 +++++++++- test/end2end/readprops.py | 42 ++++++++++++++ test/end2end/test.py | 119 +++++++++++++++++++++++++++++++++----- 4 files changed, 173 insertions(+), 19 deletions(-) create mode 100644 test/end2end/readprops.py diff --git a/bin/readprops.py b/bin/readprops.py index 4b730658a..bb0ec69fb 100644 --- a/bin/readprops.py +++ b/bin/readprops.py @@ -39,4 +39,4 @@ def readProps(prefsLoc): return verObj -# print("path is" + ','.join(sys.path)) +# print("path is" + ','.join(sys.path)) \ No newline at end of file diff --git a/test/end2end/flash.py b/test/end2end/flash.py index 11f767133..0af04c20b 100644 --- a/test/end2end/flash.py +++ b/test/end2end/flash.py @@ -16,7 +16,6 @@ def find_usb_device(vendor_id, product_id): # Flash esp32 target def flash_esp32(pio_env, port): - # Flash the ESP32 target # trunk-ignore(bandit/B603) # trunk-ignore(bandit/B607) subprocess.run( @@ -25,7 +24,33 @@ def flash_esp32(pio_env, port): def flash_nrf52(pio_env, port): - # Flash the nrf52 target + # trunk-ignore(bandit/B603) + # trunk-ignore(bandit/B607) + subprocess.run( + ["platformio", "run", "-e", pio_env, "-t", "upload", "--upload-port", port] + ) + + +def find_usb_device(vendor_id, product_id): + # Find USB devices + dev = usb.core.find(find_all=True) + # Loop through devices, printing vendor and product ids in decimal and hex + for cfg in dev: + if cfg.idVendor == vendor_id and cfg.idProduct == product_id: + return cfg + return None + + +# Flash esp32 target +def flash_esp32(pio_env, port): + # trunk-ignore(bandit/B603) + # trunk-ignore(bandit/B607) + subprocess.run( + ["platformio", "run", "-e", pio_env, "-t", "upload", "--upload-port", port] + ) + + +def flash_nrf52(pio_env, port): # trunk-ignore(bandit/B603) # trunk-ignore(bandit/B607) subprocess.run( diff --git a/test/end2end/readprops.py b/test/end2end/readprops.py new file mode 100644 index 000000000..4b730658a --- /dev/null +++ b/test/end2end/readprops.py @@ -0,0 +1,42 @@ +import configparser +import subprocess + + +def readProps(prefsLoc): + """Read the version of our project as a string""" + + config = configparser.RawConfigParser() + config.read(prefsLoc) + version = dict(config.items("VERSION")) + verObj = dict( + short="{}.{}.{}".format(version["major"], version["minor"], version["build"]), + long="unset", + ) + + # Try to find current build SHA if if the workspace is clean. This could fail if git is not installed + try: + sha = ( + subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]) + .decode("utf-8") + .strip() + ) + isDirty = ( + subprocess.check_output(["git", "diff", "HEAD"]).decode("utf-8").strip() + ) + suffix = sha + # if isDirty: + # # short for 'dirty', we want to keep our verstrings source for protobuf reasons + # suffix = sha + "-d" + verObj["long"] = "{}.{}.{}.{}".format( + version["major"], version["minor"], version["build"], suffix + ) + except: + # print("Unexpected error:", sys.exc_info()[0]) + # traceback.print_exc() + verObj["long"] = verObj["short"] + + # print("firmware version " + verStr) + return verObj + + +# print("path is" + ','.join(sys.path)) diff --git a/test/end2end/test.py b/test/end2end/test.py index ef910ae02..09bb8ef14 100644 --- a/test/end2end/test.py +++ b/test/end2end/test.py @@ -1,39 +1,126 @@ +import time +from typing import Dict, List, NamedTuple + import flash import meshtastic import meshtastic.serial_interface import pytest +from pubsub import pub # type: ignore[import-untyped] +from readprops import readProps -# from datetime import datetime +version = readProps("version.properties")["long"] heltec_v3 = ["/dev/cu.usbserial-0001", "heltec-v3", "esp32"] +rak4631 = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"] tbeam = ["COM18", "tbeam", "esp32"] -rak4631 = ["COM19", "rak4631", "nrf52"] -@pytest.fixture(scope="module", params=[heltec_v3]) +class ConnectedDevice(NamedTuple): + port: str + pio_env: str + arch: str + interface: meshtastic.serial_interface.SerialInterface + mesh_packets: List[meshtastic.mesh_pb2.FromRadio] + + +devices: Dict[str, ConnectedDevice] = {} + + +@pytest.fixture(scope="module", params=[heltec_v3, rak4631]) def device(request): port = request.param[0] pio_env = request.param[1] arch = request.param[2] - # Set up device - if arch == "esp32": - flash.flash_esp32(pio_env=pio_env, port=port) - elif arch == "nrf52": - flash.flash_nrf52(pio_env=pio_env, port=port) - # factory reset - yield meshtastic.serial_interface.SerialInterface(port) + + if devices.get(port) is not None and devices[port].interface.isConnected: + yield devices[port] + else: + time.sleep(1) + devices[port] = ConnectedDevice( + port=port, + pio_env=pio_env, + arch=arch, + interface=meshtastic.serial_interface.SerialInterface(port), + mesh_packets=[], + ) + # pub.subscribe(onReceive, "meshtastic.receive") + devices[port].interface.waitForConfig() + if devices[port].interface.metadata.firmware_version == version: + print("Already at local ref version", version) + else: + print( + "Device has version", + devices[port].interface.metadata.firmware_version, + " updating to", + version, + ) + devices[port].interface.close() + # Set up device + if arch == "esp32": + flash.flash_esp32(pio_env=pio_env, port=port) + elif arch == "nrf52": + flash.flash_nrf52(pio_env=pio_env, port=port) + # factory reset + devices[port] = ConnectedDevice( + port=port, + pio_env=pio_env, + arch=arch, + interface=meshtastic.serial_interface.SerialInterface(port), + mesh_packets=[], + ) + yield devices[port] # Tear down devices + devices[port].interface.close() # Test want_config responses from device -def test_get_info(device): - assert device is not None, "Expected port to be set" - assert len(device.nodes) > 0, "Expected at least one node in the device NodeDB" - assert device.localNode.localConfig is not None, "Expected LocalConfig to be set" - assert device.localNode.moduleConfig is not None, "Expected ModuleConfig to be set" +def test_should_get_and_set_config(device: ConnectedDevice): + assert device is not None, "Expected connected device" assert ( - len(device.localNode.channels) > 0 + len(device.interface.nodes) > 0 + ), "Expected at least one node in the device NodeDB" + assert ( + device.interface.localNode.localConfig is not None + ), "Expected LocalConfig to be set" + assert ( + device.interface.localNode.moduleConfig is not None + ), "Expected ModuleConfig to be set" + assert ( + len(device.interface.localNode.channels) > 0 ), "Expected at least one channel in the device" + # Set up testnet channel and lora config for test harness + device.interface.localNode.beginSettingsTransaction() + time.sleep(1) + device.interface.localNode.setURL( + "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ" + ) + # time.sleep(1) + # device_config = device.interface.localNode.localConfig.device + # device_config.debug_log_enabled = True + # device.interface.localNode.writeConfig(device_config) + # todo security debug_log_enabled + device.interface.localNode.commitSettingsTransaction() + time.sleep(1) + pub.subscribe( + lambda packet: { + print("Received packet", packet), + # device.mesh_packets.append(packet), + }, + "meshtastic.receive", + ) + + +def test_should_send_text_message_and_receive_ack(device: ConnectedDevice): + device.interface.sendText(text="Test broadcast", wantAck=True) + time.sleep(5) + # for port in devices: + # if devices[port].interface != device.interface: + # # assert len(devices[port].mesh_packets) > 0 + # # Assert should have received a message + # assert any( + # packet["decoded"]["payload"].decode("utf-8") == "Test broadcast" + # for packet in devices[port].mesh_packets + # ) if __name__ == "__main__":