mirror of
				https://github.com/meshtastic/firmware.git
				synced 2025-10-28 23:34:03 +00:00 
			
		
		
		
	 b0fe5ef8ba
			
		
	
	
		b0fe5ef8ba
		
			
		
	
	
	
	
		
			
			* Initial commit of a fuzzer for Meshtastic. * Use a max of 5 for the phone queues * Only write files to the temp dir * Limitless queue + fuzzer = lots of ram :) * Use $PIO_ENV for path to program * spelling: s/is/to/ * Use loopCanSleep instead of a lock in Router * realHardware allows full use of a CPU core * Ignore checkov CKV_DOCKER_2 & CKV_DOCKER_3 * Add Atak seed * Fix lint issues in build.sh * Use exception to exit from portduino_main * Separate build & source files into $WORK & $SRC * Use an ephemeral port for the API server * Include CXXFLAGS in the link step * Read all shared libraries * Use a separate work directory for each sanitizer --------- Co-authored-by: Ben Meadors <benmmeadors@gmail.com>
		
			
				
	
	
		
			169 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			169 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Generate an initial set of MeshPackets.
 | |
| 
 | |
| The fuzzer uses these MeshPackets as an initial seed of test candidates.
 | |
| 
 | |
| It's also good to add any previously discovered crash test cases to this list
 | |
| to avoid future regressions.
 | |
| 
 | |
| If left unset, the following values will be automatically set by the fuzzer.
 | |
|   - to: automatically set to the running node's NodeID
 | |
|   - from: automatically set to the running node's NodeID
 | |
|   - id: automatically set to the value of an incrementing counter
 | |
| 
 | |
| Additionally, if `pki_encrypted` is populated in the packet, the first admin key
 | |
| will be copied into the `public_key` field.
 | |
| """
 | |
| 
 | |
| import base64
 | |
| 
 | |
| from meshtastic import BROADCAST_NUM
 | |
| from meshtastic.protobuf import (
 | |
|     admin_pb2,
 | |
|     atak_pb2,
 | |
|     mesh_pb2,
 | |
|     portnums_pb2,
 | |
|     telemetry_pb2,
 | |
| )
 | |
| 
 | |
| 
 | |
| def From(node: int = 9):
 | |
|     """Return a dict suitable for **kwargs for populating the 'from' field.
 | |
| 
 | |
|     'from' is a reserved keyword in Python. It can't be used directly as an
 | |
|     argument to the MeshPacket constructor. Rather **From() can be used as
 | |
|     the final argument to provide the from node as a **kwarg.
 | |
| 
 | |
|     Defaults to 9 if no value is provided.
 | |
|     """
 | |
|     return {"from": node}
 | |
| 
 | |
| 
 | |
| packets = (
 | |
|     (
 | |
|         "position",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.POSITION_APP,
 | |
|                 payload=mesh_pb2.Position(
 | |
|                     latitude_i=int(1 * 1e7),
 | |
|                     longitude_i=int(2 * 1e7),
 | |
|                     altitude=5,
 | |
|                     precision_bits=32,
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             to=BROADCAST_NUM,
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "telemetry",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.TELEMETRY_APP,
 | |
|                 payload=telemetry_pb2.Telemetry(
 | |
|                     time=1736192207,
 | |
|                     device_metrics=telemetry_pb2.DeviceMetrics(
 | |
|                         battery_level=101,
 | |
|                         channel_utilization=8,
 | |
|                         air_util_tx=2,
 | |
|                         uptime_seconds=42,
 | |
|                     ),
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             to=BROADCAST_NUM,
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "text",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
 | |
|                 payload=b"Hello world",
 | |
|             ),
 | |
|             to=BROADCAST_NUM,
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "user",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.NODEINFO_APP,
 | |
|                 payload=mesh_pb2.User(
 | |
|                     id="!00000009",
 | |
|                     long_name="Node 9",
 | |
|                     short_name="N9",
 | |
|                     macaddr=b"\x00\x00\x00\x00\x00\x09",
 | |
|                     hw_model=mesh_pb2.HardwareModel.RAK4631,
 | |
|                     public_key=base64.b64decode(
 | |
|                         "L0ih/6F41itofdE8mYyHk1SdfOJ/QRM1KQ+pO4vEEjQ="
 | |
|                     ),
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "traceroute",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.TRACEROUTE_APP,
 | |
|                 payload=mesh_pb2.RouteDiscovery(
 | |
|                     route=[10],
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "routing",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.ROUTING_APP,
 | |
|                 payload=mesh_pb2.Routing(
 | |
|                     error_reason=mesh_pb2.Routing.NO_RESPONSE,
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "admin",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.ADMIN_APP,
 | |
|                 payload=admin_pb2.AdminMessage(
 | |
|                     get_owner_request=True,
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             pki_encrypted=True,
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
|     (
 | |
|         "atak",
 | |
|         mesh_pb2.MeshPacket(
 | |
|             decoded=mesh_pb2.Data(
 | |
|                 portnum=portnums_pb2.PortNum.ATAK_PLUGIN,
 | |
|                 payload=atak_pb2.TAKPacket(
 | |
|                     is_compressed=True,
 | |
|                     # Note, the strings are not valid for a compressed message, but will
 | |
|                     # give the fuzzer a starting point.
 | |
|                     contact=atak_pb2.Contact(
 | |
|                         callsign="callsign", device_callsign="device_callsign"
 | |
|                     ),
 | |
|                     chat=atak_pb2.GeoChat(
 | |
|                         message="message", to="to", to_callsign="to_callsign"
 | |
|                     ),
 | |
|                 ).SerializeToString(),
 | |
|             ),
 | |
|             **From(),
 | |
|         ),
 | |
|     ),
 | |
| )
 | |
| 
 | |
| for name, packet in packets:
 | |
|     with open(f"{name}.MeshPacket", "wb") as f:
 | |
|         f.write(packet.SerializeToString())
 |