31 lines
881 B
Python
31 lines
881 B
Python
import pytest
|
|
import json
|
|
from unittest.mock import MagicMock
|
|
from src.core.service_handler import ServiceHandler
|
|
|
|
def test_flighttask_prepare_reply():
|
|
mock_conn = MagicMock()
|
|
sn = "123456"
|
|
handler = ServiceHandler(mock_conn, sn)
|
|
|
|
# Simulate inbound message
|
|
topic = f"thing/product/{sn}/services"
|
|
payload = json.dumps({
|
|
"tid": "abc-123",
|
|
"bid": "xyz-789",
|
|
"method": "flighttask_prepare",
|
|
"data": {}
|
|
})
|
|
|
|
handler.handle_message(topic, payload)
|
|
|
|
# Check if reply was published
|
|
mock_conn.client.publish.assert_called_once()
|
|
args = mock_conn.client.publish.call_args
|
|
reply_topic = args[0][0]
|
|
reply_payload = json.loads(args[0][1])
|
|
|
|
assert reply_topic == f"sys/product/{sn}/services_reply"
|
|
assert reply_payload["tid"] == "abc-123"
|
|
assert reply_payload["data"]["result"] == 0
|