From cd65497ac426dffa664c68ef5295b38fc377ac36 Mon Sep 17 00:00:00 2001 From: Ben Meadors Date: Tue, 27 Aug 2024 07:02:49 -0500 Subject: [PATCH] Update to move setup to its own module --- test/end2end/setup.py | 59 +++++++++++++++++++++++++++++ test/end2end/test.py | 86 +++++++++++-------------------------------- 2 files changed, 80 insertions(+), 65 deletions(-) create mode 100644 test/end2end/setup.py diff --git a/test/end2end/setup.py b/test/end2end/setup.py new file mode 100644 index 000000000..2c54f5d82 --- /dev/null +++ b/test/end2end/setup.py @@ -0,0 +1,59 @@ +import time + +import flash +import meshtastic +import meshtastic.serial_interface +from readprops import readProps + +version = readProps("version.properties")["long"] + + +def setup_users_prefs(prefsLoc): + with open(prefsLoc, "r") as file: + filedata = file.read() + filedata = filedata.replace( + "// #define CONFIG_LORA_REGION_USERPREFS", + "#define CONFIG_LORA_REGION_USERPREFS", + ) + filedata = filedata.replace( + "// #define LORACONFIG_CHANNEL_NUM_USERPREFS", + "#define LORACONFIG_CHANNEL_NUM_USERPREFS", + ) + filedata = filedata.replace( + "// #define CHANNEL_0_PRECISION", "#define CHANNEL_0_PRECISION" + ) + with open(prefsLoc, "w") as file: + file.write(filedata) + + +def setup_device(port, pio_env, arch): + interface = meshtastic.serial_interface.SerialInterface(port) + try: + interface.waitForConfig() + if interface.metadata.firmware_version == version: + print("Already at local ref version", version) + else: + print( + "Device has version", + interface.metadata.firmware_version, + " updating to", + version, + ) + interface.close() + time.sleep(1) + flash_device(port, pio_env, arch) + time.sleep(2) + except: + interface.close() + time.sleep(1) + flash_device(port, pio_env, arch) + time.sleep(2) + interface = meshtastic.serial_interface.SerialInterface(port) + interface.waitForConfig() + + +def flash_device(port, pio_env, arch): + if arch == "esp32": + flash.flash_esp32(pio_env=pio_env, port=port) + elif arch == "nrf52": + flash.flash_nrf52(pio_env=pio_env, port=port) diff --git a/test/end2end/test.py b/test/end2end/test.py index 9be118c0e..fb61d2c13 100644 --- a/test/end2end/test.py +++ b/test/end2end/test.py @@ -1,22 +1,12 @@ import time from typing import Dict, List, NamedTuple -import flash import meshtastic import meshtastic.serial_interface import pytest +from dotmap import DotMap from pubsub import pub # type: ignore[import-untyped] -from readprops import readProps - -version = readProps("version.properties")["long"] - -heltec_v3 = ["/dev/cu.usbserial-0001", "heltec-v3", "esp32"] -rak4631 = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"] -tbeam = ["COM18", "tbeam", "esp32"] - -for port in [heltec_v3, rak4631]: - print("Flashing device", port) - flash.flash_esp32(pio_env=port[1], port=port[0]) +from setup import setup_device, setup_users_prefs # type: ignore[import-untyped] class ConnectedDevice(NamedTuple): @@ -28,19 +18,17 @@ class ConnectedDevice(NamedTuple): devices: Dict[str, ConnectedDevice] = {} -# Set up testnet channel and lora config for test harness -# device.interface.localNode.beginSettingsTransaction() -# time.sleep(1) -# device.interface.localNode.setURL( -# "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ" -# ) -# # time.sleep(1) -# # device_config = device.interface.localNode.localConfig.device -# # device_config.debug_log_enabled = True -# # device.interface.localNode.writeConfig(device_config) -# # todo security debug_log_enabled -# device.interface.localNode.commitSettingsTransaction() -# time.sleep(1) + +heltec_v3 = ["/dev/cu.usbserial-0001", "heltec-v3", "esp32"] +rak4631 = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"] +tbeam = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"] + + +setup_users_prefs("userPrefs.h") + +for port_device in [heltec_v3, rak4631]: + print("Setting up device", port_device[1], "on port", port_device[0]) + setup_device(port=port_device[0], pio_env=port_device[1], arch=port_device[2]) @pytest.fixture(scope="module", params=[rak4631, heltec_v3]) @@ -60,31 +48,6 @@ def device(request): interface=meshtastic.serial_interface.SerialInterface(port), mesh_packets=[], ) - # pub.subscribe(onReceive, "meshtastic.receive") - devices[port].interface.waitForConfig() - if devices[port].interface.metadata.firmware_version == version: - print("Already at local ref version", version) - else: - print( - "Device has version", - devices[port].interface.metadata.firmware_version, - " updating to", - version, - ) - devices[port].interface.close() - # Set up device - if arch == "esp32": - flash.flash_esp32(pio_env=pio_env, port=port) - elif arch == "nrf52": - flash.flash_nrf52(pio_env=pio_env, port=port) - # factory reset - devices[port] = ConnectedDevice( - port=port, - pio_env=pio_env, - arch=arch, - interface=meshtastic.serial_interface.SerialInterface(port), - mesh_packets=[], - ) yield devices[port] # Tear down devices devices[port].interface.close() @@ -105,18 +68,7 @@ def test_should_get_and_set_config(device: ConnectedDevice): assert ( len(device.interface.localNode.channels) > 0 ), "Expected at least one channel in the device" - device.interface.localNode.setURL( - "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ" - ) - time.sleep(1) pub.subscribe(default_on_receive, "meshtastic.receive") - # pub.subscribe( - # lambda packet: { - # print(device.pio_env, "Received packet", packet), - # device.interface.mesh_packets.append(packet), - # }, - # "meshtastic.receive", - # ) def default_on_receive(packet, interface): @@ -152,10 +104,14 @@ def test_should_send_text_message_and_receive_ack(device: ConnectedDevice): len(textPackets) > 0 ), "Expected a text message received on other device" # Assert should have received an ack - # ackPackets = list(filter( - # lambda packet: packet["decoded"]["portnum"] == meshtastic.portnums_pb2.ROUTING_APP, device.mesh_packets - # )) - # assert len(ackPackets) > 0, "Expected an ack from the device" + ackPackets = list( + filter( + lambda packet: packet["decoded"]["portnum"] + == meshtastic.portnums_pb2.ROUTING_APP, + device.mesh_packets, + ) + ) + assert len(ackPackets) > 0, "Expected an ack from the device" if __name__ == "__main__":