一、实现途径
有了AiPi-PalChatv1开发板和小智人工智能后台的夹持,不实现点实用功能难免浪费,首先肯定是自动化控制家里的电器。通过查阅资料,有多种途径。
1、可以直接对开发板编程,定制自己的固件。
2、可以通过home assistant mcp server,这个需要有home assistant ,并且还要满足一定的要求。
3、自己写MCP server,小智后台提供了MCP接口和示例。
我采用第三种方式,把之前厨房的甲烷传感器,卫生间的热水器,空调,灯等能接入的全部实现了自动控制,效果非常惊喜。
二、硬件优化

从之前的测试和最近一段时间的摸索,发现语音识别依赖于采集的声音的质量,我把开发板上的两个开关电源芯片禁用了,你也可以直接拆掉。然后通过外接的LDO实现的5V和3.3的供电,实际使用效果良好,同样的咪头,唤醒距离增加一倍,对话流畅。先上视频。
三、MCP 服务器代码
# -*- coding: utf-8 -*-
# 以下代码在2025年6月18日 python3.11环境下运行通过
import sys
sys.path.append('/usr/local/lib/python3.7/dist-packages')
import paho.mqtt.client as mqtt
#
import sys
sys.path.append('/usr/lib/python3/dist-packages')
import requests
import json
from mcp.server.fastmcp import FastMCP
import logging
import socket
import threading
import time
def connTCP(topic:str,msg:str):
global tcp_client_socket
# 创建socket
tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# IP 和端口
server_ip = 'bemfa.com'
server_port = 8344
uid = 'UUID'
try:
# 连接服务器
tcp_client_socket.connect((server_ip, server_port))
#发送订阅指令
substr = 'cmd=2&uid=' + uid + '&topic=' + topic + '&msg=' + msg + '\r\n'
tcp_client_socket.send(substr.encode("utf-8"))
time.sleep(2)
tcp_client_socket.shutdown(2)
tcp_client_socket.close()
except:
time.sleep(2)
tcp_client_socket.shutdown(2)
tcp_client_socket.close()
#connTCP()
#Kitchen_temperature = ""
#Kitchen_CH4 = ""
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("****") # 订阅消息
#消息接收
def on_message(client, userdata, msg):
global Ktemperature
global KCH4
#print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))
if msg.topic == topicA: #判断topic是否是topicA
#print(" topicA msg")
msgstr = str(msg.payload.decode('utf-8'))
Ktemperature = msgstr.split("#")[-2]
KCH4 = msgstr.split("#")[-1]
#print(Kitchen_temperature)
#print(" topicA msg")
#print(Kitchen_CH4)
#if str(msg.payload.decode('utf-8')) == "ON": #如果接收字符on,亮灯
# connTCP('topic','ON') #开灯函数
#if str(msg.payload.decode('utf-8')) == "OFF": #如果接收字亮灯
# connTCP('topic','OFF') #关灯函数
#订阅成功
def on_subscribe(client, userdata, mid, granted_qos):
print("On Subscribed: qos = %d" % granted_qos)
# 失去连接
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection %s" % rc)
logger = logging.getLogger('WHcontrol')
# Create an MCP server
mcp = FastMCP("WHcontrol")
# Add an addition tool
@mcp.tool()
def get_kitchen_temperature() -> dict:
"""用于查询厨房温度,返回温度值,单位摄氏度。"""
global Ktemperature
if float(Ktemperature) > 45:
Ktemperature = f'{float(Ktemperature)-18:.1f}' #修正温度值,由于电子温度计装的高,不修正,人工智能容易报警
result = Ktemperature
#print(result)
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def get_kitchen_CH4() -> dict:
"""用于查询厨房天然气或是甲烷含量,返回天然气或是甲烷PPM值。"""
global KCH4
if int(KCH4) > 440:
KCH4 = f'{int(KCH4)-440}'
result = KCH4
# print(result)
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def get_wh_timer_switch() -> dict:
"""用于查询热水器定时器开关的状态,on是打开,off是关闭。"""
headers={ "content-type": "application/x-www-form-urlencoded"}
urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'
response = requests.get(url=urlmsg,headers=headers,timeout=5)
if response.status_code != 200:
raise ConnectionError(f'{url} status code is {response.status_code}.')
response = json.loads(response.content)
if 'data' not in response.keys():
raise ValueError(f'{url} miss key msg.')
response = json.dumps(response['data'][0])
response = json.loads(response)
if 'msg' not in response.keys():
raise ValueError(f'{url} miss key msg.')
result = response['msg'].split("#")[-1]
print(result)
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def set_wh_timer_switch(sw:str) -> dict:
"""设置热水器定时加热开关状态,on是打开,off是关闭。打开后在每天晚上20点自动开始加热,一个小时后关闭加热开关。"""
if sw == "on":
connTCP("topic","G2on")
if sw == "off":
connTCP("topic","G2off")
result = sw
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def get_wh_switch() -> dict:
"""用于查询热水器加热开关的状态,on是打开,off是关闭。"""
headers={ "content-type": "application/x-www-form-urlencoded"}
urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'
response = requests.get(url=urlmsg,headers=headers,timeout=5)
if response.status_code != 200:
raise ConnectionError(f'{url} status code is {response.status_code}.')
response = json.loads(response.content)
if 'data' not in response.keys():
raise ValueError(f'{url} miss key msg.')
response = json.dumps(response['data'][0])
response = json.loads(response)
if 'msg' not in response.keys():
raise ValueError(f'{url} miss key msg.')
result = response['msg'].split("#")[-2]
print(result)
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def set_wh_switch(sw:str) -> dict:
"""设置热水器加热开关状态,on是打开热水器加热开关,off是关闭热水器加热开关。"""
if sw == "on":
connTCP("topic","G1on")
if sw == "off":
connTCP("topic","G1off")
result = sw
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def set_light_switch(sw:str) -> dict:
"""设置客厅照明灯的开关状态,on是打开照明灯,off是关闭照明灯。"""
if sw == "on":
connTCP("topic","A857A4AC6C") #我是通过433网关来发的控制指令,这个是我的灯的指令,修改成你们自己的
if sw == "off":
connTCP("topic","A857A4AC69")
result = sw
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def set_light_brightness(sw:str) -> dict:
"""设置客厅照明灯的亮度,add是增加亮度,fall是减小亮度。"""
if sw == "fall":
connTCP("topic","A857A4AC64")
if sw == "add":
connTCP("topic","A857A4AC68")
result = sw
logger.info(f"result: {result}")
return {"success": True, "result": result}
@mcp.tool()
def set_air_switch(sw:str) -> dict:
"""设置卧室空调的开关状态,on是打开空调,off是关闭空调。"""
if sw == "on":
connTCP("topic","H823CB2602002403053D000000007F") #我是通过红外网关来发的控制指令,这个是我的空调的指令,修改成你们自己的
if sw == "off":
connTCP("topic","H823CB26020020030F3A0000000082")
result = sw
logger.info(f"result: {result}")
return {"success": True, "result": result}
# Start the server
if __name__ == "__main__":
MQTTHOST = "bemfa.com"
MQTTPORT = 9501
client_id = "UUID"
topicA = "tipoc"
client = mqtt.Client(client_id)
client.username_pw_set("userName", "passwd")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
client.connect(MQTTHOST, MQTTPORT, 60)
client.loop_start()
time.sleep(3)
mcp.run(transport="stdio")
我的设备都是通过bemfa云控制的,具体接入文档可以参照巴法云的说明。MCP server运行在一个树莓派上,先按照小智MCP示例安装运行环境,然后直接把小智MCP接入点的字符串写到mcp_pipe.py文件中,省的设置环境变量。通过命令启动MCP server :python mcp_pipe.py switch-py.py。在小智后台 --》配置角色 最下面的MCP接入点中就应该能看到已经接入的MCP服务了。

四、完整代码
里面的一些变量需要自己修改,代码写的比较乱,大家凑合看。
附件:mcp代码.rar