dji_simulator/tests/test_service_handler.py

31 lines
881 B
Python

import pytest
import json
from unittest.mock import MagicMock
from src.core.service_handler import ServiceHandler
def test_flighttask_prepare_reply():
mock_conn = MagicMock()
sn = "123456"
handler = ServiceHandler(mock_conn, sn)
# Simulate inbound message
topic = f"thing/product/{sn}/services"
payload = json.dumps({
"tid": "abc-123",
"bid": "xyz-789",
"method": "flighttask_prepare",
"data": {}
})
handler.handle_message(topic, payload)
# Check if reply was published
mock_conn.client.publish.assert_called_once()
args = mock_conn.client.publish.call_args
reply_topic = args[0][0]
reply_payload = json.loads(args[0][1])
assert reply_topic == f"sys/product/{sn}/services_reply"
assert reply_payload["tid"] == "abc-123"
assert reply_payload["data"]["result"] == 0