发帖
0 0 0

AiPi-PalChatV1语音开发板通过MCP服务实现自动化控制加电

zzbinfo
论坛元老

10

主题

24

回帖

3178

积分

论坛元老

积分
3178
小安AI 20 0 昨天 21:45

一、实现途径

有了AiPi-PalChatv1开发板和小智人工智能后台的夹持,不实现点实用功能难免浪费,首先肯定是自动化控制家里的电器。通过查阅资料,有多种途径。

1、可以直接对开发板编程,定制自己的固件。

2、可以通过home assistant mcp server,这个需要有home assistant ,并且还要满足一定的要求。

3、自己写MCP server,小智后台提供了MCP接口和示例。

我采用第三种方式,把之前厨房的甲烷传感器,卫生间的热水器,空调,灯等能接入的全部实现了自动控制,效果非常惊喜。

二、硬件优化

ldo.jpg

从之前的测试和最近一段时间的摸索,发现语音识别依赖于采集的声音的质量,我把开发板上的两个开关电源芯片禁用了,你也可以直接拆掉。然后通过外接的LDO实现的5V和3.3的供电,实际使用效果良好,同样的咪头,唤醒距离增加一倍,对话流畅。先上视频。

三、MCP 服务器代码

# -*- coding: utf-8 -*-
# 以下代码在2025年6月18日 python3.11环境下运行通过
import sys
sys.path.append('/usr/local/lib/python3.7/dist-packages')
import paho.mqtt.client as mqtt
#

import sys
sys.path.append('/usr/lib/python3/dist-packages')
import requests
import json


from mcp.server.fastmcp import FastMCP
import logging 
import socket
import threading
import time

def connTCP(topic:str,msg:str):
    global tcp_client_socket
    # 创建socket
    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # IP 和端口
    server_ip = 'bemfa.com'
    server_port = 8344
    uid = 'UUID'

    try:
        # 连接服务器
        tcp_client_socket.connect((server_ip, server_port))
        #发送订阅指令
        substr = 'cmd=2&uid=' + uid + '&topic=' + topic + '&msg=' + msg + '\r\n'
        tcp_client_socket.send(substr.encode("utf-8"))
        time.sleep(2)
        tcp_client_socket.shutdown(2)
        tcp_client_socket.close()
    except:
        time.sleep(2)
        tcp_client_socket.shutdown(2)
        tcp_client_socket.close()
        #connTCP()

#Kitchen_temperature = ""
#Kitchen_CH4 = ""

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("****") # 订阅消息


#消息接收
def on_message(client, userdata, msg):
    global Ktemperature
    global KCH4
    #print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))
    if msg.topic == topicA:  #判断topic是否是topicA
       #print(" topicA msg")
       msgstr = str(msg.payload.decode('utf-8'))
       Ktemperature = msgstr.split("#")[-2]
       KCH4 = msgstr.split("#")[-1]
       #print(Kitchen_temperature)
       #print(" topicA msg")
       #print(Kitchen_CH4)
       #if str(msg.payload.decode('utf-8')) == "ON":  #如果接收字符on,亮灯
       #   connTCP('topic','ON') #开灯函数
       #if str(msg.payload.decode('utf-8')) == "OFF":  #如果接收字亮灯
       #   connTCP('topic','OFF')   #关灯函数


#订阅成功
def on_subscribe(client, userdata, mid, granted_qos):
    print("On Subscribed: qos = %d" % granted_qos)

# 失去连接
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection %s" % rc)

logger = logging.getLogger('WHcontrol')
# Create an MCP server
mcp = FastMCP("WHcontrol")  


# Add an addition tool
@mcp.tool() 
def get_kitchen_temperature() -> dict:
    """用于查询厨房温度,返回温度值,单位摄氏度。"""
    global Ktemperature
    if float(Ktemperature) > 45:
       Ktemperature = f'{float(Ktemperature)-18:.1f}'  #修正温度值,由于电子温度计装的高,不修正,人工智能容易报警
    result = Ktemperature
    #print(result)
    logger.info(f"result: {result}")
    return {"success": True, "result": result}

@mcp.tool()
def get_kitchen_CH4() -> dict:
    """用于查询厨房天然气或是甲烷含量,返回天然气或是甲烷PPM值。"""
    global KCH4
    if int(KCH4) > 440:
       KCH4 = f'{int(KCH4)-440}'
    result = KCH4
    # print(result)
    logger.info(f"result: {result}")
    return {"success": True, "result": result}

@mcp.tool()
def get_wh_timer_switch() -> dict:
    """用于查询热水器定时器开关的状态,on是打开,off是关闭。"""
    headers={ "content-type": "application/x-www-form-urlencoded"}
    urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'
    response = requests.get(url=urlmsg,headers=headers,timeout=5)
    if response.status_code != 200:
       raise ConnectionError(f'{url} status code is {response.status_code}.')
    response = json.loads(response.content)
    if 'data' not in response.keys():
       raise ValueError(f'{url} miss key msg.')
    response = json.dumps(response['data'][0])
    response = json.loads(response)
    if 'msg' not in response.keys():
       raise ValueError(f'{url} miss key msg.')
    result = response['msg'].split("#")[-1]
    print(result)
    logger.info(f"result: {result}")
    return {"success": True, "result": result}

@mcp.tool()
def set_wh_timer_switch(sw:str) -> dict:
    """设置热水器定时加热开关状态,on是打开,off是关闭。打开后在每天晚上20点自动开始加热,一个小时后关闭加热开关。"""
    if sw == "on":
       connTCP("topic","G2on")
    if sw == "off":
       connTCP("topic","G2off")
    result = sw
    logger.info(f"result: {result}")
    return {"success": True, "result": result}


@mcp.tool()
def get_wh_switch() -> dict:
    """用于查询热水器加热开关的状态,on是打开,off是关闭。"""
    headers={ "content-type": "application/x-www-form-urlencoded"}
    urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'
    response = requests.get(url=urlmsg,headers=headers,timeout=5)
    if response.status_code != 200:
       raise ConnectionError(f'{url} status code is {response.status_code}.')
    response = json.loads(response.content)
    if 'data' not in response.keys():
       raise ValueError(f'{url} miss key msg.')
    response = json.dumps(response['data'][0])
    response = json.loads(response)
    if 'msg' not in response.keys():
       raise ValueError(f'{url} miss key msg.')
    result = response['msg'].split("#")[-2]
    print(result)
    logger.info(f"result: {result}")
    return {"success": True, "result": result}


@mcp.tool()
def set_wh_switch(sw:str) -> dict:
    """设置热水器加热开关状态,on是打开热水器加热开关,off是关闭热水器加热开关。"""
    if sw == "on":
       connTCP("topic","G1on")
    if sw == "off":
       connTCP("topic","G1off")
    result = sw
    logger.info(f"result: {result}")
    return {"success": True, "result": result}


@mcp.tool()
def set_light_switch(sw:str) -> dict:
    """设置客厅照明灯的开关状态,on是打开照明灯,off是关闭照明灯。"""
    if sw == "on":
       connTCP("topic","A857A4AC6C") #我是通过433网关来发的控制指令,这个是我的灯的指令,修改成你们自己的
    if sw == "off":
       connTCP("topic","A857A4AC69")
    result = sw
    logger.info(f"result: {result}")
    return {"success": True, "result": result}

@mcp.tool()
def set_light_brightness(sw:str) -> dict:
    """设置客厅照明灯的亮度,add是增加亮度,fall是减小亮度。"""
    if sw == "fall":
       connTCP("topic","A857A4AC64")
    if sw == "add":
       connTCP("topic","A857A4AC68")
    result = sw
    logger.info(f"result: {result}")
    return {"success": True, "result": result}


@mcp.tool()
def set_air_switch(sw:str) -> dict:
    """设置卧室空调的开关状态,on是打开空调,off是关闭空调。"""
    if sw == "on":
       connTCP("topic","H823CB2602002403053D000000007F")  #我是通过红外网关来发的控制指令,这个是我的空调的指令,修改成你们自己的
    if sw == "off":
       connTCP("topic","H823CB26020020030F3A0000000082")
    result = sw
    logger.info(f"result: {result}")
    return {"success": True, "result": result}


# Start the server
if __name__ == "__main__":

    MQTTHOST = "bemfa.com"
    MQTTPORT = 9501
    client_id = "UUID"
    topicA = "tipoc"
    client = mqtt.Client(client_id)
    client.username_pw_set("userName", "passwd")
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    client.connect(MQTTHOST, MQTTPORT, 60)
    client.loop_start()
    time.sleep(3)
    mcp.run(transport="stdio")

我的设备都是通过bemfa云控制的,具体接入文档可以参照巴法云的说明。MCP server运行在一个树莓派上,先按照小智MCP示例安装运行环境,然后直接把小智MCP接入点的字符串写到mcp_pipe.py文件中,省的设置环境变量。通过命令启动MCP server :python mcp_pipe.py switch-py.py。在小智后台 --》配置角色 最下面的MCP接入点中就应该能看到已经接入的MCP服务了。

mcp.jpg

四、完整代码

里面的一些变量需要自己修改,代码写的比较乱,大家凑合看。

upload 附件:mcp代码.rar

──── 0人觉得很赞 ────

使用道具 举报

您需要登录后才可以回帖 立即登录
高级模式
返回
统计信息
  • 会员数: 29016 个
  • 话题数: 41778 篇