From 3c90a65a66c7fb4cbb926aca5196faf158d3d429 Mon Sep 17 00:00:00 2001 From: Ben Meadors Date: Tue, 27 Aug 2024 06:24:36 -0500 Subject: [PATCH] WIP --- test/end2end/test.py | 87 +++++++++++++++++++++++++++++++------------- 1 file changed, 61 insertions(+), 26 deletions(-) diff --git a/test/end2end/test.py b/test/end2end/test.py index 09bb8ef14..9be118c0e 100644 --- a/test/end2end/test.py +++ b/test/end2end/test.py @@ -14,6 +14,10 @@ heltec_v3 = ["/dev/cu.usbserial-0001", "heltec-v3", "esp32"] rak4631 = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"] tbeam = ["COM18", "tbeam", "esp32"] +for port in [heltec_v3, rak4631]: + print("Flashing device", port) + flash.flash_esp32(pio_env=port[1], port=port[0]) + class ConnectedDevice(NamedTuple): port: str @@ -24,9 +28,22 @@ class ConnectedDevice(NamedTuple): devices: Dict[str, ConnectedDevice] = {} +# Set up testnet channel and lora config for test harness +# device.interface.localNode.beginSettingsTransaction() +# time.sleep(1) +# device.interface.localNode.setURL( +# "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ" +# ) +# # time.sleep(1) +# # device_config = device.interface.localNode.localConfig.device +# # device_config.debug_log_enabled = True +# # device.interface.localNode.writeConfig(device_config) +# # todo security debug_log_enabled +# device.interface.localNode.commitSettingsTransaction() +# time.sleep(1) -@pytest.fixture(scope="module", params=[heltec_v3, rak4631]) +@pytest.fixture(scope="module", params=[rak4631, heltec_v3]) def device(request): port = request.param[0] pio_env = request.param[1] @@ -88,39 +105,57 @@ def test_should_get_and_set_config(device: ConnectedDevice): assert ( len(device.interface.localNode.channels) > 0 ), "Expected at least one channel in the device" - # Set up testnet channel and lora config for test harness - device.interface.localNode.beginSettingsTransaction() - time.sleep(1) device.interface.localNode.setURL( "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ" ) - # time.sleep(1) - # device_config = device.interface.localNode.localConfig.device - # device_config.debug_log_enabled = True - # device.interface.localNode.writeConfig(device_config) - # todo security debug_log_enabled - device.interface.localNode.commitSettingsTransaction() time.sleep(1) - pub.subscribe( - lambda packet: { - print("Received packet", packet), - # device.mesh_packets.append(packet), - }, - "meshtastic.receive", - ) + pub.subscribe(default_on_receive, "meshtastic.receive") + # pub.subscribe( + # lambda packet: { + # print(device.pio_env, "Received packet", packet), + # device.interface.mesh_packets.append(packet), + # }, + # "meshtastic.receive", + # ) + + +def default_on_receive(packet, interface): + print("Received packet", packet["decoded"], "interface", interface) + # find the device that sent the packet + for port in devices: + if devices[port].interface == interface: + devices[port].mesh_packets.append(packet) def test_should_send_text_message_and_receive_ack(device: ConnectedDevice): + time.sleep(2) + # Send a text message + print("Sending text from device", device.pio_env) device.interface.sendText(text="Test broadcast", wantAck=True) - time.sleep(5) - # for port in devices: - # if devices[port].interface != device.interface: - # # assert len(devices[port].mesh_packets) > 0 - # # Assert should have received a message - # assert any( - # packet["decoded"]["payload"].decode("utf-8") == "Test broadcast" - # for packet in devices[port].mesh_packets - # ) + time.sleep(2) + for port in devices: + if devices[port].port != device.port: + print("Checking device", devices[port].pio_env, "for received message") + print(devices[port].mesh_packets) + # Assert should have received a message + # find text message in packets + textPackets = list( + filter( + lambda packet: packet["decoded"]["portnum"] + == meshtastic.portnums_pb2.TEXT_MESSAGE_APP + and packet["decoded"]["payload"].decode("utf-8") + == "Test broadcast", + devices[port].mesh_packets, + ) + ) + assert ( + len(textPackets) > 0 + ), "Expected a text message received on other device" + # Assert should have received an ack + # ackPackets = list(filter( + # lambda packet: packet["decoded"]["portnum"] == meshtastic.portnums_pb2.ROUTING_APP, device.mesh_packets + # )) + # assert len(ackPackets) > 0, "Expected an ack from the device" if __name__ == "__main__":