|
- # -*- coding: utf-8 -*-
- # filename: handle.py
-
- import hashlib
- import reply
- import receive
- import web
- # laska
- from basic import Basic
- import utils
- import os
- from media import Media
- from ASR.asr_module import AsrModule
- from translator import TranslateInterface
-
- root_dir = os.getcwd()
- config_file_path = "config/config.json"
-
- class Handle(object):
- def __init__(self):
- self.config_dict = utils.config_reader(config_file_path)
- self.basic_module = Basic(self.config_dict) # 用于获取access token的basic模块
- self.media_module = Media(self.config_dict) # 用于拉取语音文件的模块
- self.asr_module = AsrModule(self.config_dict) # 语音转录模块
- # self.message_processor = MessageProcessor(self.config_dict)
- self.translator = TranslateInterface(self.config_dict) # 翻译接口
- self.logger = utils.Logger() # 日志记录
-
- # 服务器和微信服务器确认
- def GET(self):
- try:
- data = web.input()
- if len(data) == 0:
- return "hello, this is handle view"
- signature = data.signature
- timestamp = data.timestamp
- nonce = data.nonce
- echostr = data.echostr
- token = "12345" #请按照公众平台官网\基本配置中信息填写
-
- list_data = [token, timestamp, nonce]
- list_data.sort()
- temp = "".join(list_data)
- sha1 = hashlib.sha1()
- # map(sha1.update, list)
- sha1.update(temp.encode("utf-8"))
- hashcode = sha1.hexdigest()
- print("handle/GET func: hashcode, signature: ", hashcode, signature)
- if hashcode == signature:
- return echostr
- else:
- return ""
- except Exception as Argument:
- return Argument
-
- # 用户向公众号发送内容的处理逻辑
- def POST(self):
- try:
- webData = web.data()
- print("Handle Post webdata is ", webData)
- #后台打日志
- recMsg = receive.parse_xml(webData)
- # print(type(recMsg))
- # 如果收到服务器传来的信息
- if isinstance(recMsg, receive.Msg):
- toUser = recMsg.FromUserName # 实际消息的发送者
- fromUser = recMsg.ToUserName
- # msgId = recMsg.MsgId # 订阅消息没有msgID
- msgType = recMsg.MsgType # 获取的消息的类型
- # print("获取的消息类型是:", msgType)
- # 针对关注和取关的文本信息
- if msgType == "event":
- print("这是一个用户关注信息")
- event = recMsg.Event
- if event == "subscribe":
- content = "感谢您的关注!我们是支持多语言翻译的翻译平台,您可以试试对我说\"翻译今天天气很好到英语\""
- # print("感谢您的关注!")
- replyMsg = reply.TextMsg(toUser, fromUser, content)
- return replyMsg.send()
- else:
- return reply.Msg.send()
- # 针对文本信息的处理逻辑
- elif msgType == 'text':
- content = recMsg.Content
- # print(content.decode("utf-8"))
- content = content.decode("utf-8")
- print("the content after decode is: ", content)
- # 可能没有信息
- # trans_res = self.translator.translate(content)
- ana_dict = self.translator.translate(content)
- ana_dict["user_info"] = toUser
- trans_res = self.trans_res_process(ana_dict)
- print("返回微信的结果为:", trans_res)
- self.log_record(ana_dict)
-
- replyMsg = reply.TextMsg(toUser, fromUser, trans_res)
- return replyMsg.send()
- # 针对语音信息的处理逻辑
- elif msgType == "voice": # 微信获取语音格式为amr,采样率为8k
- # 待完善的代码
- mediaId = recMsg.MediaId # 获取音频的messageid
- voiceFormat = recMsg.Format # 音频的存储类型
- if recMsg.RecMessage:
- print("语音识别结果是微信自己的接口返回的")
- rec_text = recMsg.RecMessage
- else:
- # 需要使用mediaID和accessToken来下载音频
- access_token = self.basic_module.get_access_token() # 获取accessToken
- save_file_path = self.media_module.get(access_token, mediaId)
- absolute_path = os.path.join(root_dir, save_file_path) # 保存音频文件的绝对路径
- print("音频文件的绝对路径为:", absolute_path)
- rec_text = self.asr_module.recognition(absolute_path) # 音频转录后得到的文字
- # print(rec_text)
- # trans_res = self.translator.translate(rec_text, voice_flag=True)
- ana_dict = self.translator.translate(rec_text, voice_flag=True)
- ana_dict["user_info"] = toUser
- trans_res = self.trans_res_process(ana_dict)
- print("返回微信的结果为:", trans_res)
- self.log_record(ana_dict)
- replyMsg = reply.TextMsg(toUser, fromUser, trans_res)
-
- return replyMsg.send()
- # 针对图片消息的处理逻辑
- elif msgType == 'image':
- mediaId = recMsg.MediaId
- replyMsg = reply.ImageMsg(toUser, fromUser, mediaId)
- return replyMsg.send()
- else:
- print("暂且不处理")
- return reply.Msg().send()
- except Exception as Argment:
- return Argment
-
- def log_record(self, ana_dict):
- # 记录日志
- log_str = "收到新的请求!\n"
- for key, value in ana_dict.items():
- log_str += f"{key}: {value}\n"
- self.logger.info(log_str)
-
- # 针对翻译服务返回的结果进行处理,同时记录日志内容
- def trans_res_process(self, ana_dict):
-
- if "trans_res" not in ana_dict:
- return_sen = "翻译接口好像有点问题呢~请联系我们客服解决~"
- ana_dict["return_sen"] = return_sen
- elif ana_dict["trans_res"] == "\nCodemessage": # 针对丝路接口的返回不正确的情况
- return_sen = "我们的翻译接口正忙,请稍等再尝试你的请求~"
- ana_dict["return_sen"] = return_sen
- else:
- return_sen = ana_dict["trans_res"]
- #
- return return_sen
-
- # 针对不同类型消息的实际处理逻辑代码
- # 该代码暂时不可用
- class MessageProcessor(object):
- def __init__(self, config_dict):
- self.basic_module = Basic(config_dict) # 用于获取access token的basic模块
- self.media_module = Media(config_dict) # 用于拉取语音文件的模块
- self.asr_module = AsrModule(config_dict) # 语音转录模块
-
- # 处理流程的中中控,将信息分发到各自的处理器
- def main_processor(self, recMsg: receive.Msg):
- toUser = recMsg.FromUserName
- fromUser = recMsg.ToUserName
- msgType = recMsg.MsgType # 获取的消息的类型
- if msgType == "event":
- print("这是一个用户关注信息")
- event = recMsg.Event
- if event == "subscribe":
- content = "感谢您的关注!"
- # print("感谢您的关注!")
- replyMsg = reply.TextMsg(toUser, fromUser, content)
- return replyMsg.send()
- else:
- return reply.Msg.send()
- # 针对文本信息的处理逻辑
- elif msgType == 'text':
- text_content = self.text_processor(recMsg)
-
- # 调用翻译接口
- return_message = text_content
- replyMsg = reply.TextMsg(toUser, fromUser, return_message)
- return replyMsg.send()
- # 针对图片消息的处理逻辑
- elif msgType == 'image':
- mediaId = recMsg.MediaId
- replyMsg = reply.ImageMsg(toUser, fromUser, mediaId)
- return replyMsg.send()
- # 针对语音信息的处理逻辑
- elif msgType == "voice": # 微信获取语音格式为amr,采样率为8k
- # 待完善的代码
- rec_text = self.voice_processor(recMsg)
- replyMsg = reply.TextMsg(toUser, fromUser, rec_text)
- return replyMsg.send()
- else:
- print("暂且不处理")
- return reply.Msg().send()
-
- # 针对文本信息的处理器
- def text_processor(self, recMsg):
- content = recMsg.Content
- # print(content.decode("utf-8"))
- content = content.decode("utf-8")
- # print("the content after decode is: ", content)
- return content
-
- # 针对音频信息的处理器
- def voice_processor(self, recMsg):
- mediaId = recMsg.MediaId # 获取音频的messageid
- voiceFormat = recMsg.Format # 音频的存储类型
- # 需要使用mediaID和accessToken来下载音频
- access_token = self.basic_module.get_access_token() # 获取accessToken
- save_file_path = self.media_module.get(access_token, mediaId)
- absolute_path = os.path.join(root_dir, save_file_path) # 保存音频文件的绝对路径
- # print("音频文件的绝对路径为:", absolute_path)
- rec_text = self.asr_module.recognition(absolute_path, voiceFormat)
- print(rec_text) # 转录获取的文本
- return rec_text
-
- def image_processor(self,):
- pass
|