diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..9577b7b Binary files /dev/null and b/.DS_Store differ diff --git a/PROJECT.md b/PROJECT.md new file mode 100644 index 0000000..6237e1a --- /dev/null +++ b/PROJECT.md @@ -0,0 +1,96 @@ +2. 技术方案书 +以下是为您规划的 《DJI Cloud API 设备模拟器技术实施方案》。 + +2.1 系统架构设计 +系统采用 C/S 架构 或 B/S 架构 均可。考虑到您要求“简单的图形化界面”且需要模拟硬件底层通信,Python + Qt (PySide6) 是开发成本最低且性能最好的选择(也可选 Electron + Node.js)。 + +模拟器端 (Device Simulator): 扮演“网关”角色,模拟 Aircraft (无人机) 和 Dock (机场) 的行为。 + +通信层: 封装 MQTT Client,负责与您的云平台 Broker 建立长连接。 + +业务层: 包含设备状态机、航线执行引擎、媒体上报模块。 + +2.2 功能模块规划 +我们将模拟器分为四个核心模块: + +1. 连接与鉴权模块 (Connection Manager) + +功能: 输入设备SN、Secret、云平台地址(IP/Port),实现MQTT登录。 + +模拟对象: 支持配置为“直连设备”(如M30/M3E系列通过遥控器上云)或“网关子设备”(如Dock 2 + M3D无人机)。 + +心跳机制: 自动发送 Ping 包,维持在线状态。 + +2. 物模型引擎 (Thing Model Engine) 这是模拟器的核心,用于生成符合文档的数据: + +OSD 发生器 (1Hz): + +模拟经纬度(支持在地图上画点或加载GPX路径让飞机“动”起来)。 + +模拟姿态角(Pitch/Roll/Yaw)。 + +模拟电池信息(电量随飞行时间线性递减)。 + +State 上报 (事件驱动): + +当设备状态变化时(如从 Idle -> TakingOff),立即推送 State Topic。 + +模拟 Dock 2 的舱盖状态、急停按钮状态、雨量传感器报警等。 + +3. 指令响应系统 (Service Handler) 完全模拟真实设备的指令闭环: + +航线下发: 接收云端 flighttask_prepare 指令,校验参数,回复 result: 0 (成功)。 + +任务执行: 收到 flighttask_execute 后,开启内部定时器,按航线坐标更新 OSD 位置,模拟飞行过程。 + +云台控制: 接收 live_control 指令,改变模拟摄像头的朝向数据。 + +4. 媒体交互模拟 (Media Mock) + +功能: 模拟无人机拍摄照片/视频并上传。 + +流程: + +接收云端“媒体上传”指令或航线结束自动触发。 + +模拟器读取本地一张静态图片(如一张风景图)。 + +请求云端获取对象存储(OSS/S3)的临时凭证(STS)。 + +通过 HTTPS PUT 方法将图片上传到云平台指定的 URL。 + +向云端汇报“上传完成”。 + +2.3 界面设计 (GUI Mockup) +界面应包含三个主要区域: + +左侧:设备列表与配置 + +显示当前模拟的 Dock 2 和 无人机 SN。 + +连接状态指示灯(在线/离线)。 + +配置面板:输入 MQTT 地址、AppID、AppKey。 + +中间:状态可视化与地图 + +地图组件: 显示无人机当前位置图标、模拟的飞行轨迹。 + +仪表盘: 显示高度、速度、电量、卫星数。 + +Dock状态: 图形化显示舱盖(开/关)、推杆位置、充电状态。 + +右侧:调试与日志 + +控制面板: 提供滑块手动调节“电量”、“信号强度”;提供按钮触发“任务结束”、“返航”、“遭遇障碍物”等事件。 + +通信日志: 实时滚动显示发送和接收的 MQTT JSON 报文(方便您调试平台端解析逻辑)。 + +2.4 技术栈推荐 +编程语言: Python 3.10+ (开发效率高,适合处理JSON和网络IO)。 + +GUI 框架: PySide6 (Qt for Python) - 界面美观,支持多线程(防界面卡顿)。 + +网络库: paho-mqtt (处理MQTT 5.0), requests (处理HTTPS上传)。 + +地图组件: PyQtWebEngine 加载 Leaflet 或 高德地图 Web API。 \ No newline at end of file diff --git a/dji_simulator b/dji_simulator new file mode 160000 index 0000000..2b5b3cd --- /dev/null +++ b/dji_simulator @@ -0,0 +1 @@ +Subproject commit 2b5b3cd7b617196ada0cf6f80473b4361e5fe556 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..efc59ed --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +PySide6 +paho-mqtt +requests +pytest +pytest-qt diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/connection_manager.py b/src/core/connection_manager.py new file mode 100644 index 0000000..e09e2a4 --- /dev/null +++ b/src/core/connection_manager.py @@ -0,0 +1,117 @@ +import paho.mqtt.client as mqtt +import logging +import time +from threading import Thread, Event + +logger = logging.getLogger(__name__) + +class ConnectionManager: + def __init__(self): + self.client = None + self.connected = False + self._stop_event = Event() + self._heartbeat_thread = None + self._message_handler = None + + def set_message_handler(self, handler): + self._message_handler = handler + + def connect(self, broker_ip: str, client_id: str, port: int = 1883, username: str = None, password: str = None): + """ + Connects to the MQTT Broker. + """ + if self.client: + self.disconnect() + + # Create a new MQTT Client instance + # Protocol V5 is recommended for modern deployments, but V3.1.1 is standard. + # Using default (usually 3.1.1 or 5.0 depending on paho version/negotiation). + self.client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv311) + + if username and password: + self.client.username_pw_set(username, password) + + # Set callbacks + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + self.client.on_publish = self._on_publish + + logger.info(f"Connecting to {broker_ip}:{port} as {client_id}...") + try: + self.client.connect(broker_ip, port, keepalive=60) + self.client.loop_start() # Start the network loop in a background thread + except Exception as e: + logger.error(f"Failed to connect: {e}") + raise + + def disconnect(self): + """ + Disconnects from the broker. + """ + self._stop_heartbeat() + if self.client: + self.client.loop_stop() + self.client.disconnect() + self.client = None + self.connected = False + logger.info("Disconnected.") + + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.connected = True + logger.info("Connected successfully.") + self._start_heartbeat() + # Subscribe to services if needed, but usually caller handles subscription + # or we do it here if we know the topic. + # For now, we rely on caller or hardcoded. + # Let's subscribe to everything for this simulator or specific service topic + # client.subscribe("#") + # Ideally, we should let the ServiceHandler subscribe. + else: + self.connected = False + logger.error(f"Connection failed with result code {rc}") + + def _on_disconnect(self, client, userdata, rc): + self.connected = False + self._stop_heartbeat() + if rc != 0: + logger.warning("Unexpected disconnection.") + else: + logger.info("Disconnected gracefully.") + + def _on_message(self, client, userdata, msg): + logger.debug(f"Received message on {msg.topic}: {msg.payload}") + if self._message_handler: + self._message_handler(msg.topic, msg.payload) + + def _on_publish(self, client, userdata, mid): + pass + + def _start_heartbeat(self): + self._stop_event.clear() + self._heartbeat_thread = Thread(target=self._heartbeat_loop, daemon=True) + self._heartbeat_thread.start() + + def _stop_heartbeat(self): + self._stop_event.set() + if self._heartbeat_thread: + self._heartbeat_thread.join(timeout=1.0) + self._heartbeat_thread = None + + def _heartbeat_loop(self): + logger.info("Heartbeat loop started.") + while not self._stop_event.is_set(): + if self.client and self.connected: + # Actual topic/payload depends on DJI API specs. + # For now, we simulate a ping or minimal publish. + # Usually MQTT keepalive handles PINGREQ/PINGRESP automatically. + # But sometimes application-level heartbeat is needed. + # We will rely on Paho's built-in keepalive for network ping, + # but if an app-level heartbeat is needed (e.g. reporting state), we do it here. + # For now, let's just log or sleep. + pass + time.sleep(5) + + def is_connected(self): + return self.connected diff --git a/src/core/media_uploader.py b/src/core/media_uploader.py new file mode 100644 index 0000000..395178b --- /dev/null +++ b/src/core/media_uploader.py @@ -0,0 +1,31 @@ +import requests +import logging +import os + +logger = logging.getLogger(__name__) + +class MediaUploader: + def upload_file(self, upload_url, file_path): + """ + Uploads a file to the specified URL using HTTP PUT. + """ + if not os.path.exists(file_path): + logger.error(f"File not found: {file_path}") + return False + + logger.info(f"Starting upload of {file_path} to {upload_url}") + + try: + with open(file_path, 'rb') as f: + # DJI Cloud API usually uses PUT for OSS upload + response = requests.put(upload_url, data=f) + + if response.status_code in [200, 201]: + logger.info("Upload successful.") + return True + else: + logger.error(f"Upload failed: {response.status_code} - {response.text}") + return False + except Exception as e: + logger.error(f"Upload error: {e}") + return False diff --git a/src/core/osd_generator.py b/src/core/osd_generator.py new file mode 100644 index 0000000..9217d12 --- /dev/null +++ b/src/core/osd_generator.py @@ -0,0 +1,70 @@ +import time +import random + +class OsdGenerator: + def __init__(self, start_lat=22.5431, start_lon=113.957, start_alt=0.0): + # Default to DJI HQ coordinates approx + self.latitude = start_lat + self.longitude = start_lon + self.altitude = start_alt + + self.battery = 100.0 + self.pitch = 0.0 + self.roll = 0.0 + self.yaw = 0.0 + + self.speed_h = 0.0 + self.speed_v = 0.0 + + # State + self.is_flying = False + + def update(self, dt=1.0): + """ + Update state by delta time (seconds). + """ + if self.is_flying: + # Simulate battery drain (e.g., 1% every 30 seconds) + drain_rate = 1.0 / 30.0 + self.battery = max(0.0, self.battery - (drain_rate * dt)) + + # Simulate slight hover drift or movement if we had velocity + # For now, just drift altitude slightly + self.altitude += random.uniform(-0.1, 0.1) + if self.altitude < 0: self.altitude = 0 + + # Simulate slight attitude noise + self.pitch = random.uniform(-1.0, 1.0) + self.roll = random.uniform(-1.0, 1.0) + else: + # On ground charging or idle + if self.battery < 100: + self.battery += 0.5 * dt # Charge + if self.battery > 100: self.battery = 100.0 + + self.altitude = 0.0 + self.pitch = 0.0 + self.roll = 0.0 + + def get_osd_data(self): + """ + Return a dict compatible with DJI Cloud API OSD format (simplified). + """ + return { + "latitude": self.latitude, + "longitude": self.longitude, + "height": self.altitude, + "battery_percent": int(self.battery), + "attitude_pitch": round(self.pitch, 2), + "attitude_roll": round(self.roll, 2), + "attitude_yaw": round(self.yaw, 2), + "horizontal_speed": self.speed_h, + "vertical_speed": self.speed_v + } + + def start_fly(self): + self.is_flying = True + self.altitude = 5.0 # Takeoff to 5m + + def land(self): + self.is_flying = False diff --git a/src/core/service_handler.py b/src/core/service_handler.py new file mode 100644 index 0000000..e424276 --- /dev/null +++ b/src/core/service_handler.py @@ -0,0 +1,61 @@ +import json +import logging + +logger = logging.getLogger(__name__) + +class ServiceHandler: + def __init__(self, connection_manager, sn): + self.connection_manager = connection_manager + self.sn = sn + + def handle_message(self, topic, payload): + """ + Handle incoming MQTT messages. + """ + try: + data = json.loads(payload) + logger.debug(f"Received service message: {data}") + + # Simple dispatch based on 'method' field (common in DJI API) + method = data.get("method") + if method == "flighttask_prepare": + self.handle_flighttask_prepare(data) + elif method == "flighttask_execute": + self.handle_flighttask_execute(data) + elif method == "live_start": + self.handle_live_start(data) + else: + logger.warning(f"Unknown method: {method}") + + except json.JSONDecodeError: + logger.error("Failed to decode JSON payload") + + def handle_flighttask_prepare(self, data): + logger.info("Handling flighttask_prepare...") + # Reply with success + reply_topic = f"sys/product/{self.sn}/services_reply" # Hypothetical reply topic + # Or if it's a request/response pattern, check 'tid' (transaction id) + + tid = data.get("tid") + bid = data.get("bid") + + reply = { + "tid": tid, + "bid": bid, + "method": "flighttask_prepare", + "data": { + "result": 0 + } + } + + self.connection_manager.client.publish(reply_topic, json.dumps(reply)) + logger.info(f"Replied to flighttask_prepare on {reply_topic}") + + def handle_flighttask_execute(self, data): + logger.info("Handling flighttask_execute... (Not implemented yet)") + # Start flight logic here + pass + + def handle_live_start(self, data): + logger.info("Handling live_start... (Not implemented yet)") + pass diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..4eb3851 --- /dev/null +++ b/src/main.py @@ -0,0 +1,12 @@ +import sys +from PySide6.QtWidgets import QApplication +from src.ui.main_window import MainWindow + +def main(): + app = QApplication(sys.argv) + window = MainWindow() + window.show() + sys.exit(app.exec()) + +if __name__ == "__main__": + main() diff --git a/src/ui/__init__.py b/src/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ui/main_window.py b/src/ui/main_window.py new file mode 100644 index 0000000..c4b9bc0 --- /dev/null +++ b/src/ui/main_window.py @@ -0,0 +1,239 @@ +from PySide6.QtWidgets import (QMainWindow, QWidget, QHBoxLayout, QVBoxLayout, + QFrame, QLabel, QPushButton, QLineEdit, QGroupBox, QFormLayout) +from PySide6.QtCore import Qt, QTimer +from src.core.osd_generator import OsdGenerator +from src.core.connection_manager import ConnectionManager +from src.core.service_handler import ServiceHandler +from src.core.media_uploader import MediaUploader +from src.ui.map_widget import MapWidget +import json +import os +import tempfile + +class MainWindow(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle("DJI Cloud API Simulator") + self.resize(1200, 800) + + # Logic + self.osd_gen = OsdGenerator() + self.connection_manager = ConnectionManager() + self.service_handler = None + self.media_uploader = MediaUploader() + + # Central Widget + central_widget = QWidget() + self.setCentralWidget(central_widget) + + # Main Layout (Horizontal) + main_layout = QHBoxLayout(central_widget) + main_layout.setContentsMargins(10, 10, 10, 10) + main_layout.setSpacing(10) + + # --- Left Panel: Configuration --- + self.left_panel = QFrame() + self.left_panel.setFrameShape(QFrame.StyledPanel) + self.left_panel.setFixedWidth(300) + self._init_left_panel() + + # --- Middle Panel: Map & Status --- + self.middle_panel = QFrame() + self.middle_panel.setFrameShape(QFrame.StyledPanel) + self.middle_panel.setSizePolicy( + self.middle_panel.sizePolicy().horizontalPolicy(), + self.middle_panel.sizePolicy().verticalPolicy() + ) + self._init_middle_panel() + + # --- Right Panel: Logs & Debug --- + self.right_panel = QFrame() + self.right_panel.setFrameShape(QFrame.StyledPanel) + self.right_panel.setFixedWidth(300) + self._init_right_panel() + + # Add to layout + main_layout.addWidget(self.left_panel) + main_layout.addWidget(self.middle_panel, stretch=1) # Middle panel takes remaining space + main_layout.addWidget(self.right_panel) + + # Timer + self.timer = QTimer() + self.timer.timeout.connect(self._on_tick) + self.timer.start(1000) # 1Hz + + def _init_left_panel(self): + layout = QVBoxLayout(self.left_panel) + + # Device Config Group + config_group = QGroupBox("Device Configuration") + config_layout = QVBoxLayout() + + config_layout.addWidget(QLabel("Broker IP:")) + self.input_broker_ip = QLineEdit("127.0.0.1") + config_layout.addWidget(self.input_broker_ip) + + config_layout.addWidget(QLabel("Device SN:")) + self.input_sn = QLineEdit("1581F4BM12345") + config_layout.addWidget(self.input_sn) + + config_layout.addWidget(QLabel("App ID:")) + self.input_app_id = QLineEdit() + config_layout.addWidget(self.input_app_id) + + config_layout.addWidget(QLabel("App Key:")) + self.input_app_key = QLineEdit() + config_layout.addWidget(self.input_app_key) + + self.btn_connect = QPushButton("Connect") + self.btn_connect.clicked.connect(self._on_connect_clicked) + config_layout.addWidget(self.btn_connect) + + config_group.setLayout(config_layout) + + layout.addWidget(config_group) + + # Control Group + control_group = QGroupBox("Simulation Control") + control_layout = QVBoxLayout() + + self.btn_takeoff = QPushButton("Take Off") + self.btn_takeoff.clicked.connect(self.osd_gen.start_fly) + control_layout.addWidget(self.btn_takeoff) + + self.btn_land = QPushButton("Land") + self.btn_land.clicked.connect(self.osd_gen.land) + control_layout.addWidget(self.btn_land) + + self.btn_upload = QPushButton("Simulate Media Upload") + self.btn_upload.clicked.connect(self._on_upload_clicked) + control_layout.addWidget(self.btn_upload) + + control_group.setLayout(control_layout) + layout.addWidget(control_group) + + layout.addStretch() + + def _init_middle_panel(self): + layout = QVBoxLayout(self.middle_panel) + + # Map Widget + self.map_widget = MapWidget() + self.map_widget.setSizePolicy( + self.map_widget.sizePolicy().horizontalPolicy(), + self.map_widget.sizePolicy().verticalPolicy() + ) + + layout.addWidget(self.map_widget, stretch=2) + + # Dashboard Group + dashboard_group = QGroupBox("Status Dashboard") + dashboard_layout = QFormLayout() + + self.lbl_latitude = QLabel("0.0") + self.lbl_longitude = QLabel("0.0") + self.lbl_altitude = QLabel("0.0 m") + self.lbl_battery = QLabel("100 %") + self.lbl_attitude = QLabel("P: 0 R: 0 Y: 0") + + dashboard_layout.addRow("Latitude:", self.lbl_latitude) + dashboard_layout.addRow("Longitude:", self.lbl_longitude) + dashboard_layout.addRow("Altitude:", self.lbl_altitude) + dashboard_layout.addRow("Battery:", self.lbl_battery) + dashboard_layout.addRow("Attitude:", self.lbl_attitude) + + dashboard_group.setLayout(dashboard_layout) + + layout.addWidget(dashboard_group, stretch=1) + + def _init_right_panel(self): + layout = QVBoxLayout(self.right_panel) + + log_label = QLabel("Communication Logs") + layout.addWidget(log_label) + + # Placeholder for logs (e.g., QTextEdit later) + self.log_area = QLabel("Log Output...") + self.log_area.setAlignment(Qt.AlignTop | Qt.AlignLeft) + self.log_area.setStyleSheet("background-color: #fff; border: 1px solid #999;") + self.log_area.setWordWrap(True) + + layout.addWidget(self.log_area, stretch=1) + + self.btn_clear_logs = QPushButton("Clear Logs") + self.btn_clear_logs.clicked.connect(lambda: self.log_area.setText("")) + layout.addWidget(self.btn_clear_logs) + + def _on_connect_clicked(self): + if not self.connection_manager.is_connected(): + ip = self.input_broker_ip.text() + sn = self.input_sn.text() + + # Init Service Handler + self.service_handler = ServiceHandler(self.connection_manager, sn) + self.connection_manager.set_message_handler(self.service_handler.handle_message) + + # For now, minimal auth + try: + self.connection_manager.connect(ip, sn) + + # Subscribe to services + + service_topic = f"thing/product/{sn}/services" + self.connection_manager.client.subscribe(service_topic) + + self.btn_connect.setText("Disconnect") + self.log_area.setText(f"Connected to {ip}\nSubscribed to {service_topic}") + except Exception as e: + self.log_area.setText(f"Connection Failed: {e}") + else: + self.connection_manager.disconnect() + self.service_handler = None + self.btn_connect.setText("Connect") + self.log_area.setText("Disconnected") + + def _on_tick(self): + self.osd_gen.update() + data = self.osd_gen.get_osd_data() + + self.lbl_latitude.setText(str(data["latitude"])) + self.lbl_longitude.setText(str(data["longitude"])) + self.lbl_altitude.setText(f"{data['height']:.2f} m") + self.lbl_battery.setText(f"{data['battery_percent']} %") + self.lbl_attitude.setText(f"P: {data['attitude_pitch']} R: {data['attitude_roll']} Y: {data['attitude_yaw']}") + + # Update Map + self.map_widget.update_position(data["latitude"], data["longitude"]) + + if self.connection_manager.is_connected(): + sn = self.input_sn.text() + topic = f"sys/product/{sn}/osd" + payload = json.dumps(data) + self.connection_manager.client.publish(topic, payload) + # Optional: Log to UI (truncated) + # self.log_area.setText(f"Published to {topic}:\n{payload}") + + def _on_upload_clicked(self): + # Create a dummy file + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: + tmp.write(b"Mock Image Data") + tmp_path = tmp.name + + # Mock URL + url = "http://example.com/upload-mock" # This will fail in reality, but logs will show attempt + + self.log_area.setText(f"Uploading {tmp_path} to {url}...") + + # Run in thread? For now, blocking is okay for simulator demo, but better async. + # Just calling it directly for simplicity. + success = self.media_uploader.upload_file(url, tmp_path) + + if success: + self.log_area.setText(self.log_area.text() + "\nUpload Success!") + else: + self.log_area.setText(self.log_area.text() + "\nUpload Failed (Expected if no server).") + + os.remove(tmp_path) + + + diff --git a/src/ui/map.html b/src/ui/map.html new file mode 100644 index 0000000..da6dbac --- /dev/null +++ b/src/ui/map.html @@ -0,0 +1,43 @@ + + + + Map + + + + + + + +
+ + + diff --git a/src/ui/map_widget.py b/src/ui/map_widget.py new file mode 100644 index 0000000..2baf8a9 --- /dev/null +++ b/src/ui/map_widget.py @@ -0,0 +1,24 @@ +import os +from PySide6.QtWidgets import QWidget, QVBoxLayout +from PySide6.QtWebEngineWidgets import QWebEngineView +from PySide6.QtCore import QUrl + +class MapWidget(QWidget): + def __init__(self): + super().__init__() + self.layout = QVBoxLayout(self) + self.layout.setContentsMargins(0, 0, 0, 0) + + self.web_view = QWebEngineView() + self.layout.addWidget(self.web_view) + + # Load local HTML + current_dir = os.path.dirname(os.path.abspath(__file__)) + html_path = os.path.join(current_dir, "map.html") + self.web_view.setUrl(QUrl.fromLocalFile(html_path)) + + def update_position(self, lat, lon): + # Call JS function + # Ensure lat/lon are floats + script = f"updateDronePosition({lat}, {lon});" + self.web_view.page().runJavaScript(script) diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_main_window.py b/tests/test_main_window.py new file mode 100644 index 0000000..d786c86 --- /dev/null +++ b/tests/test_main_window.py @@ -0,0 +1,27 @@ +import pytest +from PySide6.QtWidgets import QMainWindow, QLineEdit, QPushButton +from src.ui.main_window import MainWindow + +def test_main_window_initialization(qtbot): + """ + Test if the main window initializes correctly with the expected title and widgets. + """ + window = MainWindow() + qtbot.addWidget(window) + + # Check title + assert window.windowTitle() == "DJI Cloud API Simulator" + + # Check if key widgets are present (using object traversal or just knowing they were created) + # Since we didn't assign object names or make all of them public attributes, + # we can check the public ones we did create in _init_left_panel + + assert isinstance(window.input_broker_ip, QLineEdit) + assert window.input_broker_ip.text() == "127.0.0.1" + + assert isinstance(window.btn_connect, QPushButton) + assert window.btn_connect.text() == "Connect" + + # Check geometry/visibility (optional, but good sanity check) + assert window.width() >= 800 + assert window.height() >= 600 diff --git a/tests/test_main_window_integration.py b/tests/test_main_window_integration.py new file mode 100644 index 0000000..7dfa07c --- /dev/null +++ b/tests/test_main_window_integration.py @@ -0,0 +1,55 @@ +import pytest +from unittest.mock import MagicMock, patch +from src.ui.main_window import MainWindow + +def test_on_tick_updates_ui_and_publishes(qtbot): + with patch("src.ui.main_window.ConnectionManager") as mock_conn_mgr_cls, \ + patch("src.ui.main_window.OsdGenerator") as mock_osd_cls: + + # Setup mocks + mock_conn_instance = mock_conn_mgr_cls.return_value + mock_conn_instance.is_connected.return_value = True + + mock_osd_instance = mock_osd_cls.return_value + mock_osd_instance.get_osd_data.return_value = { + "latitude": 22.0, "longitude": 113.0, "height": 10.0, + "battery_percent": 90, "attitude_pitch": 1.0, + "attitude_roll": 2.0, "attitude_yaw": 3.0, + "horizontal_speed": 0, "vertical_speed": 0 + } + + window = MainWindow() + qtbot.addWidget(window) + + # Force a tick + window._on_tick() + + # Check UI updates + assert window.lbl_latitude.text() == "22.0" + assert window.lbl_altitude.text() == "10.00 m" + + # Check Publish + mock_conn_instance.client.publish.assert_called() + args = mock_conn_instance.client.publish.call_args + # Topic should contain SN (default 1581F4BM12345) + assert "sys/product/1581F4BM12345/osd" in args[0][0] + # Payload should contain json data + assert "22.0" in args[0][1] + +def test_connect_button_toggle(qtbot): + with patch("src.ui.main_window.ConnectionManager") as mock_conn_mgr_cls: + mock_conn_instance = mock_conn_mgr_cls.return_value + mock_conn_instance.is_connected.side_effect = [False, True] # First call False, then True + + window = MainWindow() + qtbot.addWidget(window) + + # Initial state + assert window.btn_connect.text() == "Connect" + + # Click connect + window.btn_connect.click() + + # Should call connect + mock_conn_instance.connect.assert_called() + assert window.btn_connect.text() == "Disconnect" diff --git a/tests/test_media_mock.py b/tests/test_media_mock.py new file mode 100644 index 0000000..8294393 --- /dev/null +++ b/tests/test_media_mock.py @@ -0,0 +1,32 @@ +import pytest +from unittest.mock import patch, MagicMock +from src.core.media_uploader import MediaUploader +import os + +def test_upload_success(tmp_path): + # Create dummy file + f = tmp_path / "test.jpg" + f.write_text("dummy image data") + + uploader = MediaUploader() + + with patch("src.core.media_uploader.requests.put") as mock_put: + mock_put.return_value.status_code = 200 + + result = uploader.upload_file("http://mock-upload-url.com", str(f)) + + assert result is True + mock_put.assert_called_once() + +def test_upload_failure(tmp_path): + f = tmp_path / "test.jpg" + f.write_text("dummy") + + uploader = MediaUploader() + + with patch("src.core.media_uploader.requests.put") as mock_put: + mock_put.return_value.status_code = 403 + + result = uploader.upload_file("http://mock-upload-url.com", str(f)) + + assert result is False diff --git a/tests/test_mqtt_client.py b/tests/test_mqtt_client.py new file mode 100644 index 0000000..25efc7e --- /dev/null +++ b/tests/test_mqtt_client.py @@ -0,0 +1,49 @@ +import pytest +from unittest.mock import MagicMock, patch +from src.core.connection_manager import ConnectionManager + +@pytest.fixture +def connection_manager(): + return ConnectionManager() + +def test_connect_calls_client_connect(connection_manager): + with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls: + mock_client_instance = mock_client_cls.return_value + + connection_manager.connect("127.0.0.1", "test_client") + + mock_client_cls.assert_called_with(client_id="test_client", protocol=4) # protocol=4 is MQTTv311 usually, check paho constants if strict + mock_client_instance.connect.assert_called_with("127.0.0.1", 1883, keepalive=60) + mock_client_instance.loop_start.assert_called_once() + +def test_disconnect_calls_client_disconnect(connection_manager): + with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls: + mock_client_instance = mock_client_cls.return_value + connection_manager.connect("127.0.0.1", "test_client") + + connection_manager.disconnect() + + mock_client_instance.loop_stop.assert_called_once() + mock_client_instance.disconnect.assert_called_once() + assert connection_manager.client is None + assert connection_manager.connected is False + +def test_on_connect_callback(connection_manager): + with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls: + # We don't need to mock the instance methods deeply, just the flow. + connection_manager.connect("127.0.0.1", "test_client") + + # Simulate successful connection callback + # on_connect signature: client, userdata, flags, rc + connection_manager._on_connect(None, None, None, 0) + + assert connection_manager.connected is True + +def test_on_connect_callback_failure(connection_manager): + with patch("src.core.connection_manager.mqtt.Client") as mock_client_cls: + connection_manager.connect("127.0.0.1", "test_client") + + # Simulate failed connection callback (rc != 0) + connection_manager._on_connect(None, None, None, 5) + + assert connection_manager.connected is False diff --git a/tests/test_osd_generator.py b/tests/test_osd_generator.py new file mode 100644 index 0000000..5bb8147 --- /dev/null +++ b/tests/test_osd_generator.py @@ -0,0 +1,31 @@ +import pytest +from src.core.osd_generator import OsdGenerator + +def test_initial_state(): + gen = OsdGenerator() + data = gen.get_osd_data() + assert data["battery_percent"] == 100 + assert data["height"] == 0.0 + assert not gen.is_flying + +def test_flight_update(): + gen = OsdGenerator() + gen.start_fly() + assert gen.is_flying + assert gen.altitude > 0 + + # Update for 30 seconds + gen.update(dt=30.0) + + data = gen.get_osd_data() + # Battery should have dropped approx 1% + assert data["battery_percent"] <= 99 + +def test_charging_on_ground(): + gen = OsdGenerator() + gen.battery = 50.0 + gen.is_flying = False + + gen.update(dt=10.0) + + assert gen.battery > 50.0 diff --git a/tests/test_service_handler.py b/tests/test_service_handler.py new file mode 100644 index 0000000..8a816f5 --- /dev/null +++ b/tests/test_service_handler.py @@ -0,0 +1,30 @@ +import pytest +import json +from unittest.mock import MagicMock +from src.core.service_handler import ServiceHandler + +def test_flighttask_prepare_reply(): + mock_conn = MagicMock() + sn = "123456" + handler = ServiceHandler(mock_conn, sn) + + # Simulate inbound message + topic = f"thing/product/{sn}/services" + payload = json.dumps({ + "tid": "abc-123", + "bid": "xyz-789", + "method": "flighttask_prepare", + "data": {} + }) + + handler.handle_message(topic, payload) + + # Check if reply was published + mock_conn.client.publish.assert_called_once() + args = mock_conn.client.publish.call_args + reply_topic = args[0][0] + reply_payload = json.loads(args[0][1]) + + assert reply_topic == f"sys/product/{sn}/services_reply" + assert reply_payload["tid"] == "abc-123" + assert reply_payload["data"]["result"] == 0