wechat_sender.listener 源代码

# coding: utf-8
from __future__ import unicode_literals
from __future__ import absolute_import

import copy
import functools
import json
import os
import time
import datetime
import psutil
import logging

import sys
import tornado.web

from wechat_sender.objects import WxBot, Message, Global
from wechat_sender.utils import StatusWrapperMixin, STATUS_BOT_EXCEPTION, STATUS_PERMISSION_DENIED, \
    STATUS_TORNADO_EXCEPTION, DEFAULT_REMIND_TIME, STATUS_ERROR, DEFAULT_REPORT_TIME, DELAY_TASK, PERIODIC_TASK, \
    MESSAGE_REPORT_COMMAND, SYSTEM_TASK, MESSAGE_STATUS_COMMAND

glb = None
_logger = logging.getLogger(__name__)


class Application(tornado.web.Application):
    """
    tornado app 初始化
    """

    def __init__(self):
        handlers = [
            (r"/message", MessageHandle),
            (r"/delay_message", DelayMessageHandle),
            (r"/periodic_message", PeriodicMessageHandle),
            (r"/send_to_message", UserMessageHandle),
        ]
        settings = dict(
            static_path=os.path.join(os.path.dirname(__file__), "static"),
        )
        super(Application, self).__init__(handlers, **settings)


class MessageHandle(StatusWrapperMixin, tornado.web.RequestHandler):
    """
    普通消息处理 handle
    """

    def post(self, *args, **kwargs):
        message = self.get_argument('content', None)
        token = self.get_argument('token', None)
        receivers = self.get_argument('receivers', None)
        if not message:
            self.status_code = STATUS_ERROR
            self.write('Content is required')
            return

        if glb.token:
            if glb.token != token:
                self.status_code = STATUS_PERMISSION_DENIED
                self.write('Token is missing')
                return
        try:
            msg = Message(message, receivers=receivers)
            glb.wxbot.send_msg(msg)
            self.write('Success')
        except Exception as e:
            _logger.exception(e)
            self.status_code = STATUS_BOT_EXCEPTION
            self.write(e.message)


class DelayMessageHandle(StatusWrapperMixin, tornado.web.RequestHandler):
    """
    延时消息处理 handle
    """

    def __init__(self, application, request, *args, **kwargs):
        self.ioloop = tornado.ioloop.IOLoop.instance()
        super(DelayMessageHandle, self).__init__(application, request, *args, **kwargs)

    def post(self, *args, **kwargs):
        content = self.get_argument('content', '')
        title = self.get_argument('title', '')
        task_time = self.get_argument('time', None)
        remind = int(self.get_argument('remind', DEFAULT_REMIND_TIME))
        token = self.get_argument('token', None)
        receivers = self.get_argument('receivers', None)

        if glb.token:
            if glb.token != token:
                self.status_code = STATUS_PERMISSION_DENIED
                self.write('Token is missing')
                return
        if task_time:
            try:
                task_time = datetime.datetime.strptime(task_time, '%Y-%m-%d %H:%M:%S')
                timestamp = time.mktime(
                    (task_time - datetime.timedelta(
                        seconds=remind)).timetuple())
            except ValueError as e:
                self.status_code = STATUS_ERROR
                self.write(e.message)
                _logger.exception(e)
                return
        else:
            task_time = datetime.datetime.now()
            timestamp = int(time.mktime(task_time.timetuple()))
        try:
            message = Message(content, title, task_time, datetime.timedelta(seconds=remind), receivers=receivers)
            self.ioloop.call_at(timestamp, self.delay_task, DELAY_TASK, message)
            self.write('Success')
        except Exception as e:
            self.status_code = STATUS_TORNADO_EXCEPTION
            self.write(e.message)
            _logger.exception(e)

    @staticmethod
    def delay_task(task_type, message):
        # try:
        glb.wxbot.send_msg(message)
        _logger.info(
            '{0} Send delay message {1} at {2:%Y-%m-%d %H:%M:%S}'.format(task_type, message, datetime.datetime.now()))
        # except Exception as e:


class PeriodicMessageHandle(StatusWrapperMixin, tornado.web.RequestHandler):
    """
    周期消息处理 handle
    """

    def __init__(self, application, request, *args, **kwargs):
        self.ioloop = tornado.ioloop.IOLoop.instance()
        super(PeriodicMessageHandle, self).__init__(application, request, *args, **kwargs)

    def post(self, *args, **kwargs):
        content = self.get_argument('content', '')
        title = self.get_argument('title', '')
        interval = self.get_argument('interval', None)
        token = self.get_argument('token', None)
        receivers = self.get_argument('receivers', None)

        if glb.token:
            if glb.token != token:
                self.status_code = STATUS_PERMISSION_DENIED
                self.write('Token is missing')
                return
        if not interval:
            self.status_code = STATUS_ERROR
            self.write('interval is required')
            return
        try:
            interval = int(interval)
        except Exception as e:
            self.status_code = STATUS_ERROR
            self.write('interval must be a integer')
        try:
            message = Message(content, title=title, interval=datetime.timedelta(seconds=interval), receivers=receivers)
            user_periodic = tornado.ioloop.PeriodicCallback(
                functools.partial(self.periodic_task, PERIODIC_TASK, message),
                interval * 1000, self.ioloop)
            glb.periodic_list.append(user_periodic)
            user_periodic.start()
            self.write('Success')
        except Exception as e:
            self.status_code = STATUS_TORNADO_EXCEPTION
            self.write(e.message)
            _logger.exception(e)

    @staticmethod
    def periodic_task(task_type, message):
        glb.wxbot.send_msg(message)
        _logger.info('{0} Send periodic message {1} at {2:%Y-%m-%d %H:%M:%S}'.format(task_type, message,
                                                                                     datetime.datetime.now()))


class UserMessageHandle(StatusWrapperMixin, tornado.web.RequestHandler):
    """
    指定消息接收处理 handle
    """

    def post(self, *args, **kwargs):
        from wxpy import ensure_one

        content = self.get_argument('content', '')
        search = self.get_argument('search', '')
        token = self.get_argument('token', None)
        default_receiver = self.get_argument('receivers', None)

        if glb.token:
            if glb.token != token:
                self.status_code = STATUS_PERMISSION_DENIED
                self.write('Token is missing')
                return
        try:
            search = json.loads(search)
        except ValueError:
            search = search
        try:
            if isinstance(search, dict):
                receiver = ensure_one(glb.wxbot.bot.search(**search))
            else:
                receiver = ensure_one(glb.wxbot.bot.search(search))
        except ValueError:
            receiver = None
        if receiver:
            receiver.send_msg(content)
        else:
            msg = '消息发送失败,没有找到接收者。\n[搜索条件]: {0}\n[消息内容]:{1}'.format(search, content)
            message = Message(msg, receivers=default_receiver)
            glb.wxbot.send_msg(message)
            _logger.info(msg)
        self.write('Success')


def generate_run_info():
    """
    获取当前运行状态
    """
    uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(glb.run_info.create_time())
    memory_usage = glb.run_info.memory_info().rss
    msg = '[当前时间] {now:%H:%M:%S}\n[运行时间] {uptime}\n[内存占用] {memory}\n[发送消息] {messages}'.format(
        now=datetime.datetime.now(),
        uptime=str(uptime).split('.')[0],
        memory='{:.2f} MB'.format(memory_usage / 1024 ** 2),
        messages=len(glb.wxbot.bot.messages)
    )
    return msg


def check_bot(task_type=SYSTEM_TASK):
    """
    wxpy bot 健康检查任务
    """
    if glb.wxbot.bot.alive:
        msg = generate_run_info()
        message = Message(content=msg, receivers='status')
        glb.wxbot.send_msg(message)
        _logger.info(
            '{0} Send status message {1} at {2:%Y-%m-%d %H:%M:%S}'.format(task_type, msg, datetime.datetime.now()))
    else:
        # todo
        pass


def timeout_message_report():
    """
    周期/延时 消息报告
    """
    timeout_list = glb.ioloop._timeouts
    delay_task = []
    for timeout in timeout_list:
        if not timeout.callback:
            continue
        if len(timeout.callback.args) == 2:
            task_type, message = timeout.callback.args
            delay_task.append(message)
    msg = '当前已注册延时消息共有{0}条'.format(len(delay_task))
    for i, itm in enumerate(delay_task):
        msg = '{pre}\n[ID (序号) ]:D{index}\n[消息接收]:{receiver}\n[发送时间]:{remind}\n[消息时间]:{time}\n[消息标题]:{message}\n'.format(
            pre=msg, index=i, remind=itm.remind, time=itm.time, message=itm.title or itm.content, receiver=itm.receiver)
    interval_task = [(periodic.callback.args[1], periodic.is_running()) for periodic in glb.periodic_list if
                     len(periodic.callback.args) == 2 and periodic.callback.args[0] == PERIODIC_TASK]
    msg = '{0}\n当前已注册周期消息共有{1}条'.format(msg, len(interval_task))
    for i, itm in enumerate(interval_task):
        msg = '{pre}\n[ID (序号) ]:P{index}\n[消息接收]:{receiver}\n[运行状态]:{status}\n[发送周期]:{interval}\n[消息标题]:{message}\n'.format(
            pre=msg, index=i, interval=itm[0].interval, status='已激活' if itm[1] else '未激活',
            message=itm[0].title or itm[0].content, receiver=itm[0].receiver)
    return msg


def register_listener_handle(wxbot):
    """
    wechat_sender 向 wxpy 注册控制消息 handler
    """
    from wxpy import TEXT

    @wxbot.bot.register(wxbot.default_receiver, TEXT, except_self=False)
    def sender_command_handle(msg):
        command_dict = {MESSAGE_REPORT_COMMAND: timeout_message_report(),
                        MESSAGE_STATUS_COMMAND: generate_run_info()}
        message = command_dict.get(msg.text, None)
        if message:
            return message
        myself = wxbot.bot.registered.get_config(msg)
        registered_copy = copy.copy(wxbot.bot.registered)
        registered_copy.remove(myself)
        pre_conf = registered_copy.get_config(msg)
        if pre_conf:
            my_name = sys._getframe().f_code.co_name
            if my_name != pre_conf.func.__name__:
                pre_conf.func(msg)


[文档]def listen(bot, receivers=None, token=None, port=10245, status_report=False, status_receiver=None, status_interval=DEFAULT_REPORT_TIME): """ 传入 bot 实例并启动 wechat_sender 服务 :param bot: (必填|Bot对象) - wxpy 的 Bot 对象实例 :param receivers: (选填|wxpy.Chat 对象|Chat 对象列表) - 消息接收者,wxpy 的 Chat 对象实例, 或 Chat 对象列表,如果为 list 第一个 Chat 为默认接收者。如果为 Chat 对象,则默认接收者也是此对象。 不填为当前 bot 对象的文件接收者 :param token: (选填|str) - 信令,防止 receiver 被非法滥用,建议加上 token 防止非法使用,如果使用 token 请在初始化 `Sender()` 时也使用统一 token,否则无法发送。token 建议为 32 位及以上的无规律字符串 :param port: (选填|int) - 监听端口, 监听端口默认为 10245 ,如有冲突或特殊需要请自行指定,需要和 `Sender()` 统一 :param status_report: (选填|bool) - 是否开启状态报告,如果开启,wechat_sender 将会定时发送状态信息到 status_receiver :param status_receiver: (选填|Chat 对象) - 指定 status_receiver,不填将会发送状态消息给默认接收者 :param status_interval: (选填|int|datetime.timedelta) - 指定状态报告发送间隔时间,为 integer 时代表毫秒 """ global glb periodic_list = [] app = Application() wxbot = WxBot(bot, receivers, status_receiver) register_listener_handle(wxbot) process = psutil.Process() app.listen(port) if status_report: if isinstance(status_interval, datetime.timedelta): status_interval = status_interval.seconds * 1000 check_periodic = tornado.ioloop.PeriodicCallback(functools.partial(check_bot, SYSTEM_TASK), status_interval) check_periodic.start() periodic_list.append(check_periodic) glb = Global(wxbot=wxbot, run_info=process, periodic_list=periodic_list, ioloop=tornado.ioloop.IOLoop.instance(), token=token) tornado.ioloop.IOLoop.current().start()