搭好了初步框架

main
mula.liu 2025-12-16 13:27:54 +08:00
parent 2b5b3cd7b6
commit 39997574b5
23 changed files with 923 additions and 0 deletions

BIN
.DS_Store vendored 100644

Binary file not shown.

96
PROJECT.md 100644
View File

@ -0,0 +1,96 @@
2. 技术方案书
以下是为您规划的 《DJI Cloud API 设备模拟器技术实施方案》。
2.1 系统架构设计
系统采用 C/S 架构 或 B/S 架构 均可。考虑到您要求“简单的图形化界面”且需要模拟硬件底层通信Python + Qt (PySide6) 是开发成本最低且性能最好的选择(也可选 Electron + Node.js
模拟器端 (Device Simulator): 扮演“网关”角色,模拟 Aircraft (无人机) 和 Dock (机场) 的行为。
通信层: 封装 MQTT Client负责与您的云平台 Broker 建立长连接。
业务层: 包含设备状态机、航线执行引擎、媒体上报模块。
2.2 功能模块规划
我们将模拟器分为四个核心模块:
1. 连接与鉴权模块 (Connection Manager)
功能: 输入设备SN、Secret、云平台地址IP/Port实现MQTT登录。
模拟对象: 支持配置为“直连设备”如M30/M3E系列通过遥控器上云或“网关子设备”如Dock 2 + M3D无人机
心跳机制: 自动发送 Ping 包,维持在线状态。
2. 物模型引擎 (Thing Model Engine) 这是模拟器的核心,用于生成符合文档的数据:
OSD 发生器 (1Hz):
模拟经纬度支持在地图上画点或加载GPX路径让飞机“动”起来
模拟姿态角Pitch/Roll/Yaw
模拟电池信息(电量随飞行时间线性递减)。
State 上报 (事件驱动):
当设备状态变化时(如从 Idle -> TakingOff立即推送 State Topic。
模拟 Dock 2 的舱盖状态、急停按钮状态、雨量传感器报警等。
3. 指令响应系统 (Service Handler) 完全模拟真实设备的指令闭环:
航线下发: 接收云端 flighttask_prepare 指令,校验参数,回复 result: 0 (成功)。
任务执行: 收到 flighttask_execute 后,开启内部定时器,按航线坐标更新 OSD 位置,模拟飞行过程。
云台控制: 接收 live_control 指令,改变模拟摄像头的朝向数据。
4. 媒体交互模拟 (Media Mock)
功能: 模拟无人机拍摄照片/视频并上传。
流程:
接收云端“媒体上传”指令或航线结束自动触发。
模拟器读取本地一张静态图片(如一张风景图)。
请求云端获取对象存储OSS/S3的临时凭证STS
通过 HTTPS PUT 方法将图片上传到云平台指定的 URL。
向云端汇报“上传完成”。
2.3 界面设计 (GUI Mockup)
界面应包含三个主要区域:
左侧:设备列表与配置
显示当前模拟的 Dock 2 和 无人机 SN。
连接状态指示灯(在线/离线)。
配置面板:输入 MQTT 地址、AppID、AppKey。
中间:状态可视化与地图
地图组件: 显示无人机当前位置图标、模拟的飞行轨迹。
仪表盘: 显示高度、速度、电量、卫星数。
Dock状态: 图形化显示舱盖(开/关)、推杆位置、充电状态。
右侧:调试与日志
控制面板: 提供滑块手动调节“电量”、“信号强度”;提供按钮触发“任务结束”、“返航”、“遭遇障碍物”等事件。
通信日志: 实时滚动显示发送和接收的 MQTT JSON 报文(方便您调试平台端解析逻辑)。
2.4 技术栈推荐
编程语言: Python 3.10+ (开发效率高适合处理JSON和网络IO)。
GUI 框架: PySide6 (Qt for Python) - 界面美观,支持多线程(防界面卡顿)。
网络库: paho-mqtt (处理MQTT 5.0), requests (处理HTTPS上传)。
地图组件: PyQtWebEngine 加载 Leaflet 或 高德地图 Web API。

1
dji_simulator 160000

@ -0,0 +1 @@
Subproject commit 2b5b3cd7b617196ada0cf6f80473b4361e5fe556

5
requirements.txt 100644
View File

@ -0,0 +1,5 @@
PySide6
paho-mqtt
requests
pytest
pytest-qt

0
src/__init__.py 100644
View File

View File

View File

@ -0,0 +1,117 @@
import paho.mqtt.client as mqtt
import logging
import time
from threading import Thread, Event
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self):
self.client = None
self.connected = False
self._stop_event = Event()
self._heartbeat_thread = None
self._message_handler = None
def set_message_handler(self, handler):
self._message_handler = handler
def connect(self, broker_ip: str, client_id: str, port: int = 1883, username: str = None, password: str = None):
"""
Connects to the MQTT Broker.
"""
if self.client:
self.disconnect()
# Create a new MQTT Client instance
# Protocol V5 is recommended for modern deployments, but V3.1.1 is standard.
# Using default (usually 3.1.1 or 5.0 depending on paho version/negotiation).
self.client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv311)
if username and password:
self.client.username_pw_set(username, password)
# Set callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
logger.info(f"Connecting to {broker_ip}:{port} as {client_id}...")
try:
self.client.connect(broker_ip, port, keepalive=60)
self.client.loop_start() # Start the network loop in a background thread
except Exception as e:
logger.error(f"Failed to connect: {e}")
raise
def disconnect(self):
"""
Disconnects from the broker.
"""
self._stop_heartbeat()
if self.client:
self.client.loop_stop()
self.client.disconnect()
self.client = None
self.connected = False
logger.info("Disconnected.")
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
logger.info("Connected successfully.")
self._start_heartbeat()
# Subscribe to services if needed, but usually caller handles subscription
# or we do it here if we know the topic.
# For now, we rely on caller or hardcoded.
# Let's subscribe to everything for this simulator or specific service topic
# client.subscribe("#")
# Ideally, we should let the ServiceHandler subscribe.
else:
self.connected = False
logger.error(f"Connection failed with result code {rc}")
def _on_disconnect(self, client, userdata, rc):
self.connected = False
self._stop_heartbeat()
if rc != 0:
logger.warning("Unexpected disconnection.")
else:
logger.info("Disconnected gracefully.")
def _on_message(self, client, userdata, msg):
logger.debug(f"Received message on {msg.topic}: {msg.payload}")
if self._message_handler:
self._message_handler(msg.topic, msg.payload)
def _on_publish(self, client, userdata, mid):
pass
def _start_heartbeat(self):
self._stop_event.clear()
self._heartbeat_thread = Thread(target=self._heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
def _stop_heartbeat(self):
self._stop_event.set()
if self._heartbeat_thread:
self._heartbeat_thread.join(timeout=1.0)
self._heartbeat_thread = None
def _heartbeat_loop(self):
logger.info("Heartbeat loop started.")
while not self._stop_event.is_set():
if self.client and self.connected:
# Actual topic/payload depends on DJI API specs.
# For now, we simulate a ping or minimal publish.
# Usually MQTT keepalive handles PINGREQ/PINGRESP automatically.
# But sometimes application-level heartbeat is needed.
# We will rely on Paho's built-in keepalive for network ping,
# but if an app-level heartbeat is needed (e.g. reporting state), we do it here.
# For now, let's just log or sleep.
pass
time.sleep(5)
def is_connected(self):
return self.connected

View File

@ -0,0 +1,31 @@
import requests
import logging
import os
logger = logging.getLogger(__name__)
class MediaUploader:
def upload_file(self, upload_url, file_path):
"""
Uploads a file to the specified URL using HTTP PUT.
"""
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return False
logger.info(f"Starting upload of {file_path} to {upload_url}")
try:
with open(file_path, 'rb') as f:
# DJI Cloud API usually uses PUT for OSS upload
response = requests.put(upload_url, data=f)
if response.status_code in [200, 201]:
logger.info("Upload successful.")
return True
else:
logger.error(f"Upload failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Upload error: {e}")
return False

View File

@ -0,0 +1,70 @@
import time
import random
class OsdGenerator:
def __init__(self, start_lat=22.5431, start_lon=113.957, start_alt=0.0):
# Default to DJI HQ coordinates approx
self.latitude = start_lat
self.longitude = start_lon
self.altitude = start_alt
self.battery = 100.0
self.pitch = 0.0
self.roll = 0.0
self.yaw = 0.0
self.speed_h = 0.0
self.speed_v = 0.0
# State
self.is_flying = False
def update(self, dt=1.0):
"""
Update state by delta time (seconds).
"""
if self.is_flying:
# Simulate battery drain (e.g., 1% every 30 seconds)
drain_rate = 1.0 / 30.0
self.battery = max(0.0, self.battery - (drain_rate * dt))
# Simulate slight hover drift or movement if we had velocity
# For now, just drift altitude slightly
self.altitude += random.uniform(-0.1, 0.1)
if self.altitude < 0: self.altitude = 0
# Simulate slight attitude noise
self.pitch = random.uniform(-1.0, 1.0)
self.roll = random.uniform(-1.0, 1.0)
else:
# On ground charging or idle
if self.battery < 100:
self.battery += 0.5 * dt # Charge
if self.battery > 100: self.battery = 100.0
self.altitude = 0.0
self.pitch = 0.0
self.roll = 0.0
def get_osd_data(self):
"""
Return a dict compatible with DJI Cloud API OSD format (simplified).
"""
return {
"latitude": self.latitude,
"longitude": self.longitude,
"height": self.altitude,
"battery_percent": int(self.battery),
"attitude_pitch": round(self.pitch, 2),
"attitude_roll": round(self.roll, 2),
"attitude_yaw": round(self.yaw, 2),
"horizontal_speed": self.speed_h,
"vertical_speed": self.speed_v
}
def start_fly(self):
self.is_flying = True
self.altitude = 5.0 # Takeoff to 5m
def land(self):
self.is_flying = False

View File

@ -0,0 +1,61 @@
import json
import logging
logger = logging.getLogger(__name__)
class ServiceHandler:
def __init__(self, connection_manager, sn):
self.connection_manager = connection_manager
self.sn = sn
def handle_message(self, topic, payload):
"""
Handle incoming MQTT messages.
"""
try:
data = json.loads(payload)
logger.debug(f"Received service message: {data}")
# Simple dispatch based on 'method' field (common in DJI API)
method = data.get("method")
if method == "flighttask_prepare":
self.handle_flighttask_prepare(data)
elif method == "flighttask_execute":
self.handle_flighttask_execute(data)
elif method == "live_start":
self.handle_live_start(data)
else:
logger.warning(f"Unknown method: {method}")
except json.JSONDecodeError:
logger.error("Failed to decode JSON payload")
def handle_flighttask_prepare(self, data):
logger.info("Handling flighttask_prepare...")
# Reply with success
reply_topic = f"sys/product/{self.sn}/services_reply" # Hypothetical reply topic
# Or if it's a request/response pattern, check 'tid' (transaction id)
tid = data.get("tid")
bid = data.get("bid")
reply = {
"tid": tid,
"bid": bid,
"method": "flighttask_prepare",
"data": {
"result": 0
}
}
self.connection_manager.client.publish(reply_topic, json.dumps(reply))
logger.info(f"Replied to flighttask_prepare on {reply_topic}")
def handle_flighttask_execute(self, data):
logger.info("Handling flighttask_execute... (Not implemented yet)")
# Start flight logic here
pass
def handle_live_start(self, data):
logger.info("Handling live_start... (Not implemented yet)")
pass

12
src/main.py 100644
View File

@ -0,0 +1,12 @@
import sys
from PySide6.QtWidgets import QApplication
from src.ui.main_window import MainWindow
def main():
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,239 @@
from PySide6.QtWidgets import (QMainWindow, QWidget, QHBoxLayout, QVBoxLayout,
QFrame, QLabel, QPushButton, QLineEdit, QGroupBox, QFormLayout)
from PySide6.QtCore import Qt, QTimer
from src.core.osd_generator import OsdGenerator
from src.core.connection_manager import ConnectionManager
from src.core.service_handler import ServiceHandler
from src.core.media_uploader import MediaUploader
from src.ui.map_widget import MapWidget
import json
import os
import tempfile
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DJI Cloud API Simulator")
self.resize(1200, 800)
# Logic
self.osd_gen = OsdGenerator()
self.connection_manager = ConnectionManager()
self.service_handler = None
self.media_uploader = MediaUploader()
# Central Widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Main Layout (Horizontal)
main_layout = QHBoxLayout(central_widget)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.setSpacing(10)
# --- Left Panel: Configuration ---
self.left_panel = QFrame()
self.left_panel.setFrameShape(QFrame.StyledPanel)
self.left_panel.setFixedWidth(300)
self._init_left_panel()
# --- Middle Panel: Map & Status ---
self.middle_panel = QFrame()
self.middle_panel.setFrameShape(QFrame.StyledPanel)
self.middle_panel.setSizePolicy(
self.middle_panel.sizePolicy().horizontalPolicy(),
self.middle_panel.sizePolicy().verticalPolicy()
)
self._init_middle_panel()
# --- Right Panel: Logs & Debug ---
self.right_panel = QFrame()
self.right_panel.setFrameShape(QFrame.StyledPanel)
self.right_panel.setFixedWidth(300)
self._init_right_panel()
# Add to layout
main_layout.addWidget(self.left_panel)
main_layout.addWidget(self.middle_panel, stretch=1) # Middle panel takes remaining space
main_layout.addWidget(self.right_panel)
# Timer
self.timer = QTimer()
self.timer.timeout.connect(self._on_tick)
self.timer.start(1000) # 1Hz
def _init_left_panel(self):
layout = QVBoxLayout(self.left_panel)
# Device Config Group
config_group = QGroupBox("Device Configuration")
config_layout = QVBoxLayout()
config_layout.addWidget(QLabel("Broker IP:"))
self.input_broker_ip = QLineEdit("127.0.0.1")
config_layout.addWidget(self.input_broker_ip)
config_layout.addWidget(QLabel("Device SN:"))
self.input_sn = QLineEdit("1581F4BM12345")
config_layout.addWidget(self.input_sn)
config_layout.addWidget(QLabel("App ID:"))
self.input_app_id = QLineEdit()
config_layout.addWidget(self.input_app_id)
config_layout.addWidget(QLabel("App Key:"))
self.input_app_key = QLineEdit()
config_layout.addWidget(self.input_app_key)
self.btn_connect = QPushButton("Connect")
self.btn_connect.clicked.connect(self._on_connect_clicked)
config_layout.addWidget(self.btn_connect)
config_group.setLayout(config_layout)
layout.addWidget(config_group)
# Control Group
control_group = QGroupBox("Simulation Control")
control_layout = QVBoxLayout()
self.btn_takeoff = QPushButton("Take Off")
self.btn_takeoff.clicked.connect(self.osd_gen.start_fly)
control_layout.addWidget(self.btn_takeoff)
self.btn_land = QPushButton("Land")
self.btn_land.clicked.connect(self.osd_gen.land)
control_layout.addWidget(self.btn_land)
self.btn_upload = QPushButton("Simulate Media Upload")
self.btn_upload.clicked.connect(self._on_upload_clicked)
control_layout.addWidget(self.btn_upload)
control_group.setLayout(control_layout)
layout.addWidget(control_group)
layout.addStretch()
def _init_middle_panel(self):
layout = QVBoxLayout(self.middle_panel)
# Map Widget
self.map_widget = MapWidget()
self.map_widget.setSizePolicy(
self.map_widget.sizePolicy().horizontalPolicy(),
self.map_widget.sizePolicy().verticalPolicy()
)
layout.addWidget(self.map_widget, stretch=2)
# Dashboard Group
dashboard_group = QGroupBox("Status Dashboard")
dashboard_layout = QFormLayout()
self.lbl_latitude = QLabel("0.0")
self.lbl_longitude = QLabel("0.0")
self.lbl_altitude = QLabel("0.0 m")
self.lbl_battery = QLabel("100 %")
self.lbl_attitude = QLabel("P: 0 R: 0 Y: 0")
dashboard_layout.addRow("Latitude:", self.lbl_latitude)
dashboard_layout.addRow("Longitude:", self.lbl_longitude)
dashboard_layout.addRow("Altitude:", self.lbl_altitude)
dashboard_layout.addRow("Battery:", self.lbl_battery)
dashboard_layout.addRow("Attitude:", self.lbl_attitude)
dashboard_group.setLayout(dashboard_layout)
layout.addWidget(dashboard_group, stretch=1)
def _init_right_panel(self):
layout = QVBoxLayout(self.right_panel)
log_label = QLabel("Communication Logs")
layout.addWidget(log_label)
# Placeholder for logs (e.g., QTextEdit later)
self.log_area = QLabel("Log Output...")
self.log_area.setAlignment(Qt.AlignTop | Qt.AlignLeft)
self.log_area.setStyleSheet("background-color: #fff; border: 1px solid #999;")
self.log_area.setWordWrap(True)
layout.addWidget(self.log_area, stretch=1)
self.btn_clear_logs = QPushButton("Clear Logs")
self.btn_clear_logs.clicked.connect(lambda: self.log_area.setText(""))
layout.addWidget(self.btn_clear_logs)
def _on_connect_clicked(self):
if not self.connection_manager.is_connected():
ip = self.input_broker_ip.text()
sn = self.input_sn.text()
# Init Service Handler
self.service_handler = ServiceHandler(self.connection_manager, sn)
self.connection_manager.set_message_handler(self.service_handler.handle_message)
# For now, minimal auth
try:
self.connection_manager.connect(ip, sn)
# Subscribe to services
service_topic = f"thing/product/{sn}/services"
self.connection_manager.client.subscribe(service_topic)
self.btn_connect.setText("Disconnect")
self.log_area.setText(f"Connected to {ip}\nSubscribed to {service_topic}")
except Exception as e:
self.log_area.setText(f"Connection Failed: {e}")
else:
self.connection_manager.disconnect()
self.service_handler = None
self.btn_connect.setText("Connect")
self.log_area.setText("Disconnected")
def _on_tick(self):
self.osd_gen.update()
data = self.osd_gen.get_osd_data()
self.lbl_latitude.setText(str(data["latitude"]))
self.lbl_longitude.setText(str(data["longitude"]))
self.lbl_altitude.setText(f"{data['height']:.2f} m")
self.lbl_battery.setText(f"{data['battery_percent']} %")
self.lbl_attitude.setText(f"P: {data['attitude_pitch']} R: {data['attitude_roll']} Y: {data['attitude_yaw']}")
# Update Map
self.map_widget.update_position(data["latitude"], data["longitude"])
if self.connection_manager.is_connected():
sn = self.input_sn.text()
topic = f"sys/product/{sn}/osd"
payload = json.dumps(data)
self.connection_manager.client.publish(topic, payload)
# Optional: Log to UI (truncated)
# self.log_area.setText(f"Published to {topic}:\n{payload}")
def _on_upload_clicked(self):
# Create a dummy file
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(b"Mock Image Data")
tmp_path = tmp.name
# Mock URL
url = "http://example.com/upload-mock" # This will fail in reality, but logs will show attempt
self.log_area.setText(f"Uploading {tmp_path} to {url}...")
# Run in thread? For now, blocking is okay for simulator demo, but better async.
# Just calling it directly for simplicity.
success = self.media_uploader.upload_file(url, tmp_path)
if success:
self.log_area.setText(self.log_area.text() + "\nUpload Success!")
else:
self.log_area.setText(self.log_area.text() + "\nUpload Failed (Expected if no server).")
os.remove(tmp_path)

43
src/ui/map.html 100644
View File

@ -0,0 +1,43 @@
<!DOCTYPE html>
<html>
<head>
<title>Map</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
crossorigin=""/>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"
integrity="sha256-20nQCchB9co0qIjJZRGuk2/Z9VM+kNiyxNV1lvTlZBo="
crossorigin=""></script>
<style>
body { padding: 0; margin: 0; }
html, body, #map { height: 100%; width: 100%; }
</style>
</head>
<body>
<div id="map"></div>
<script>
var map = L.map('map').setView([22.5431, 113.957], 13);
L.tileLayer('https://tile.openstreetmap.org/{z}/{x}/{y}.png', {
maxZoom: 19,
attribution: '&copy; <a href="http://www.openstreetmap.org/copyright">OpenStreetMap</a>'
}).addTo(map);
var droneIcon = L.icon({
iconUrl: 'https://unpkg.com/leaflet@1.9.4/dist/images/marker-icon.png',
iconSize: [25, 41],
iconAnchor: [12, 41],
popupAnchor: [1, -34],
});
var droneMarker = L.marker([22.5431, 113.957], {icon: droneIcon}).addTo(map);
function updateDronePosition(lat, lon) {
var newLatLng = new L.LatLng(lat, lon);
droneMarker.setLatLng(newLatLng);
map.panTo(newLatLng);
}
</script>
</body>
</html>

View File

@ -0,0 +1,24 @@
import os
from PySide6.QtWidgets import QWidget, QVBoxLayout
from PySide6.QtWebEngineWidgets import QWebEngineView
from PySide6.QtCore import QUrl
class MapWidget(QWidget):
def __init__(self):
super().__init__()
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.web_view = QWebEngineView()
self.layout.addWidget(self.web_view)
# Load local HTML
current_dir = os.path.dirname(os.path.abspath(__file__))
html_path = os.path.join(current_dir, "map.html")
self.web_view.setUrl(QUrl.fromLocalFile(html_path))
def update_position(self, lat, lon):
# Call JS function
# Ensure lat/lon are floats
script = f"updateDronePosition({lat}, {lon});"
self.web_view.page().runJavaScript(script)

View File

View File

View File

@ -0,0 +1,27 @@
import pytest
from PySide6.QtWidgets import QMainWindow, QLineEdit, QPushButton
from src.ui.main_window import MainWindow
def test_main_window_initialization(qtbot):
"""
Test if the main window initializes correctly with the expected title and widgets.
"""
window = MainWindow()
qtbot.addWidget(window)
# Check title
assert window.windowTitle() == "DJI Cloud API Simulator"
# Check if key widgets are present (using object traversal or just knowing they were created)
# Since we didn't assign object names or make all of them public attributes,
# we can check the public ones we did create in _init_left_panel
assert isinstance(window.input_broker_ip, QLineEdit)
assert window.input_broker_ip.text() == "127.0.0.1"
assert isinstance(window.btn_connect, QPushButton)
assert window.btn_connect.text() == "Connect"
# Check geometry/visibility (optional, but good sanity check)
assert window.width() >= 800
assert window.height() >= 600

View File

@ -0,0 +1,55 @@
import pytest
from unittest.mock import MagicMock, patch
from src.ui.main_window import MainWindow
def test_on_tick_updates_ui_and_publishes(qtbot):
with patch("src.ui.main_window.ConnectionManager") as mock_conn_mgr_cls, \
patch("src.ui.main_window.OsdGenerator") as mock_osd_cls:
# Setup mocks
mock_conn_instance = mock_conn_mgr_cls.return_value
mock_conn_instance.is_connected.return_value = True
mock_osd_instance = mock_osd_cls.return_value
mock_osd_instance.get_osd_data.return_value = {
"latitude": 22.0, "longitude": 113.0, "height": 10.0,
"battery_percent": 90, "attitude_pitch": 1.0,
"attitude_roll": 2.0, "attitude_yaw": 3.0,
"horizontal_speed": 0, "vertical_speed": 0
}
window = MainWindow()
qtbot.addWidget(window)
# Force a tick
window._on_tick()
# Check UI updates
assert window.lbl_latitude.text() == "22.0"
assert window.lbl_altitude.text() == "10.00 m"
# Check Publish
mock_conn_instance.client.publish.assert_called()
args = mock_conn_instance.client.publish.call_args
# Topic should contain SN (default 1581F4BM12345)
assert "sys/product/1581F4BM12345/osd" in args[0][0]
# Payload should contain json data
assert "22.0" in args[0][1]
def test_connect_button_toggle(qtbot):
with patch("src.ui.main_window.ConnectionManager") as mock_conn_mgr_cls:
mock_conn_instance = mock_conn_mgr_cls.return_value
mock_conn_instance.is_connected.side_effect = [False, True] # First call False, then True
window = MainWindow()
qtbot.addWidget(window)
# Initial state
assert window.btn_connect.text() == "Connect"
# Click connect
window.btn_connect.click()
# Should call connect
mock_conn_instance.connect.assert_called()
assert window.btn_connect.text() == "Disconnect"

View File

@ -0,0 +1,32 @@
import pytest
from unittest.mock import patch, MagicMock
from src.core.media_uploader import MediaUploader
import os
def test_upload_success(tmp_path):
# Create dummy file
f = tmp_path / "test.jpg"
f.write_text("dummy image data")
uploader = MediaUploader()
with patch("src.core.media_uploader.requests.put") as mock_put:
mock_put.return_value.status_code = 200
result = uploader.upload_file("http://mock-upload-url.com", str(f))
assert result is True
mock_put.assert_called_once()
def test_upload_failure(tmp_path):
f = tmp_path / "test.jpg"
f.write_text("dummy")
uploader = MediaUploader()
with patch("src.core.media_uploader.requests.put") as mock_put:
mock_put.return_value.status_code = 403
result = uploader.upload_file("http://mock-upload-url.com", str(f))
assert result is False

View File

@ -0,0 +1,49 @@
import pytest
from unittest.mock import MagicMock, patch
from src.core.connection_manager import ConnectionManager
@pytest.fixture
def connection_manager():
return ConnectionManager()
def test_connect_calls_client_connect(connection_manager):
with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls:
mock_client_instance = mock_client_cls.return_value
connection_manager.connect("127.0.0.1", "test_client")
mock_client_cls.assert_called_with(client_id="test_client", protocol=4) # protocol=4 is MQTTv311 usually, check paho constants if strict
mock_client_instance.connect.assert_called_with("127.0.0.1", 1883, keepalive=60)
mock_client_instance.loop_start.assert_called_once()
def test_disconnect_calls_client_disconnect(connection_manager):
with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls:
mock_client_instance = mock_client_cls.return_value
connection_manager.connect("127.0.0.1", "test_client")
connection_manager.disconnect()
mock_client_instance.loop_stop.assert_called_once()
mock_client_instance.disconnect.assert_called_once()
assert connection_manager.client is None
assert connection_manager.connected is False
def test_on_connect_callback(connection_manager):
with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls:
# We don't need to mock the instance methods deeply, just the flow.
connection_manager.connect("127.0.0.1", "test_client")
# Simulate successful connection callback
# on_connect signature: client, userdata, flags, rc
connection_manager._on_connect(None, None, None, 0)
assert connection_manager.connected is True
def test_on_connect_callback_failure(connection_manager):
with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls:
connection_manager.connect("127.0.0.1", "test_client")
# Simulate failed connection callback (rc != 0)
connection_manager._on_connect(None, None, None, 5)
assert connection_manager.connected is False

View File

@ -0,0 +1,31 @@
import pytest
from src.core.osd_generator import OsdGenerator
def test_initial_state():
gen = OsdGenerator()
data = gen.get_osd_data()
assert data["battery_percent"] == 100
assert data["height"] == 0.0
assert not gen.is_flying
def test_flight_update():
gen = OsdGenerator()
gen.start_fly()
assert gen.is_flying
assert gen.altitude > 0
# Update for 30 seconds
gen.update(dt=30.0)
data = gen.get_osd_data()
# Battery should have dropped approx 1%
assert data["battery_percent"] <= 99
def test_charging_on_ground():
gen = OsdGenerator()
gen.battery = 50.0
gen.is_flying = False
gen.update(dt=10.0)
assert gen.battery > 50.0

View File

@ -0,0 +1,30 @@
import pytest
import json
from unittest.mock import MagicMock
from src.core.service_handler import ServiceHandler
def test_flighttask_prepare_reply():
mock_conn = MagicMock()
sn = "123456"
handler = ServiceHandler(mock_conn, sn)
# Simulate inbound message
topic = f"thing/product/{sn}/services"
payload = json.dumps({
"tid": "abc-123",
"bid": "xyz-789",
"method": "flighttask_prepare",
"data": {}
})
handler.handle_message(topic, payload)
# Check if reply was published
mock_conn.client.publish.assert_called_once()
args = mock_conn.client.publish.call_args
reply_topic = args[0][0]
reply_payload = json.loads(args[0][1])
assert reply_topic == f"sys/product/{sn}/services_reply"
assert reply_payload["tid"] == "abc-123"
assert reply_payload["data"]["result"] == 0