This commit is contained in:
Ben Meadors 2024-08-27 06:24:36 -05:00
parent 9e8f0814ab
commit 3c90a65a66

View File

@ -14,6 +14,10 @@ heltec_v3 = ["/dev/cu.usbserial-0001", "heltec-v3", "esp32"]
rak4631 = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"] rak4631 = ["/dev/cu.usbmodem14201", "rak4631", "nrf52"]
tbeam = ["COM18", "tbeam", "esp32"] tbeam = ["COM18", "tbeam", "esp32"]
for port in [heltec_v3, rak4631]:
print("Flashing device", port)
flash.flash_esp32(pio_env=port[1], port=port[0])
class ConnectedDevice(NamedTuple): class ConnectedDevice(NamedTuple):
port: str port: str
@ -24,9 +28,22 @@ class ConnectedDevice(NamedTuple):
devices: Dict[str, ConnectedDevice] = {} devices: Dict[str, ConnectedDevice] = {}
# Set up testnet channel and lora config for test harness
# device.interface.localNode.beginSettingsTransaction()
# time.sleep(1)
# device.interface.localNode.setURL(
# "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ"
# )
# # time.sleep(1)
# # device_config = device.interface.localNode.localConfig.device
# # device_config.debug_log_enabled = True
# # device.interface.localNode.writeConfig(device_config)
# # todo security debug_log_enabled
# device.interface.localNode.commitSettingsTransaction()
# time.sleep(1)
@pytest.fixture(scope="module", params=[heltec_v3, rak4631]) @pytest.fixture(scope="module", params=[rak4631, heltec_v3])
def device(request): def device(request):
port = request.param[0] port = request.param[0]
pio_env = request.param[1] pio_env = request.param[1]
@ -88,39 +105,57 @@ def test_should_get_and_set_config(device: ConnectedDevice):
assert ( assert (
len(device.interface.localNode.channels) > 0 len(device.interface.localNode.channels) > 0
), "Expected at least one channel in the device" ), "Expected at least one channel in the device"
# Set up testnet channel and lora config for test harness
device.interface.localNode.beginSettingsTransaction()
time.sleep(1)
device.interface.localNode.setURL( device.interface.localNode.setURL(
"https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ" "https://meshtastic.org/e/#CisSIMqU8uiTvxZmoXhh1eOgay0QoT8c5-cwr-XozNr40ZUrGgdUZXN0TmV0EhEIATgBQAJIAVABWB9oAcAGAQ"
) )
# time.sleep(1)
# device_config = device.interface.localNode.localConfig.device
# device_config.debug_log_enabled = True
# device.interface.localNode.writeConfig(device_config)
# todo security debug_log_enabled
device.interface.localNode.commitSettingsTransaction()
time.sleep(1) time.sleep(1)
pub.subscribe( pub.subscribe(default_on_receive, "meshtastic.receive")
lambda packet: { # pub.subscribe(
print("Received packet", packet), # lambda packet: {
# device.mesh_packets.append(packet), # print(device.pio_env, "Received packet", packet),
}, # device.interface.mesh_packets.append(packet),
"meshtastic.receive", # },
) # "meshtastic.receive",
# )
def default_on_receive(packet, interface):
print("Received packet", packet["decoded"], "interface", interface)
# find the device that sent the packet
for port in devices:
if devices[port].interface == interface:
devices[port].mesh_packets.append(packet)
def test_should_send_text_message_and_receive_ack(device: ConnectedDevice): def test_should_send_text_message_and_receive_ack(device: ConnectedDevice):
time.sleep(2)
# Send a text message
print("Sending text from device", device.pio_env)
device.interface.sendText(text="Test broadcast", wantAck=True) device.interface.sendText(text="Test broadcast", wantAck=True)
time.sleep(5) time.sleep(2)
# for port in devices: for port in devices:
# if devices[port].interface != device.interface: if devices[port].port != device.port:
# # assert len(devices[port].mesh_packets) > 0 print("Checking device", devices[port].pio_env, "for received message")
# # Assert should have received a message print(devices[port].mesh_packets)
# assert any( # Assert should have received a message
# packet["decoded"]["payload"].decode("utf-8") == "Test broadcast" # find text message in packets
# for packet in devices[port].mesh_packets textPackets = list(
# ) filter(
lambda packet: packet["decoded"]["portnum"]
== meshtastic.portnums_pb2.TEXT_MESSAGE_APP
and packet["decoded"]["payload"].decode("utf-8")
== "Test broadcast",
devices[port].mesh_packets,
)
)
assert (
len(textPackets) > 0
), "Expected a text message received on other device"
# Assert should have received an ack
# ackPackets = list(filter(
# lambda packet: packet["decoded"]["portnum"] == meshtastic.portnums_pb2.ROUTING_APP, device.mesh_packets
# ))
# assert len(ackPackets) > 0, "Expected an ack from the device"
if __name__ == "__main__": if __name__ == "__main__":