logo NodeSeekbeta

DeepFlood论坛关键词推送机器人来啦~

前言

参考的ns类似的机器人 这个机器人写起来不能说有难度 只能说官方给的条件很成熟 感谢酒神给的rss链接!

如何部署

由于我还没来得及把他扔github上 先把代码贴这里吧
安装下依赖:

pip install python-telegram-bot feedparser apscheduler

然后复制一波我这个代码:

import logging
import sqlite3
import feedparser
import requests
import time
import threading
from datetime import datetime
import os

# 配置日志
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

# 机器人配置
BOT_TOKEN = "552212031:srdgkijiejqiowejvi12u3hf"  # 请替换为你的机器人token
RSS_URL = "https://feed.deepflood.com/topic.rss.xml"
BOT_NAME = "DeepFlood关键词通知机器人"

# 代理配置 - 修改这里来设置代理
# 格式: "http://用户名:密码@代理地址:端口" 或 "http://代理地址:端口"
# 如果不需要代理,设置为 None 或 ""
PROXY = ""# 示例: "http://127.0.0.1:1080" 或 "http://user:[email protected]:8080"

class TelegramBot:
    def __init__(self):
        self.token = BOT_TOKEN
        self.base_url = f"https://api.telegram.org/bot{self.token}"
        self.last_update_id = 0
        self.proxies = self.setup_proxy()
        self.init_db()
    
    def setup_proxy(self):
        """设置代理"""
        if PROXY and PROXY.strip():
            logger.info(f"使用代理: {PROXY}")
            return {
                "http": PROXY,
                "https": PROXY
            }
        else:
            logger.info("不使用代理")
            return None
    
    def init_db(self):
        """初始化数据库"""
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        c.execute('''
            CREATE TABLE IF NOT EXISTS user_keywords
            (user_id INTEGER, keyword TEXT, UNIQUE(user_id, keyword))
        ''')
        c.execute('''
            CREATE TABLE IF NOT EXISTS sent_posts
            (post_guid TEXT PRIMARY KEY, sent_time TIMESTAMP)
        ''')
        conn.commit()
        conn.close()
        logger.info("数据库初始化完成")
    
    def send_message(self, chat_id, text):
        """发送消息到Telegram"""
        url = f"{self.base_url}/sendMessage"
        data = {
            "chat_id": chat_id,
            "text": text,
            "parse_mode": "HTML"
        }
        try:
            response = requests.post(url, data=data, timeout=10, proxies=self.proxies)
            if response.status_code == 200:
                return True
            else:
                logger.error(f"发送消息失败: {response.text}")
                return False
        except Exception as e:
            logger.error(f"发送消息异常: {e}")
            return False
    
    def get_updates(self):
        """获取Telegram更新"""
        url = f"{self.base_url}/getUpdates"
        params = {
            "offset": self.last_update_id + 1,
            "timeout": 30
        }
        try:
            response = requests.get(url, params=params, timeout=35, proxies=self.proxies)
            if response.status_code == 200:
                data = response.json()
                if data.get("ok"):
                    return data.get("result", [])
            else:
                logger.error(f"获取更新失败: {response.text}")
        except Exception as e:
            logger.error(f"获取更新异常: {e}")
        return []
    
    def get_rss_with_proxy(self):
        """使用代理获取RSS内容"""
        try:
            response = requests.get(RSS_URL, timeout=10, proxies=self.proxies)
            if response.status_code == 200:
                # 使用feedparser解析响应内容
                return feedparser.parse(response.content)
            else:
                logger.error(f"获取RSS失败,状态码: {response.status_code}")
                return feedparser.parse("")  # 返回空的feed对象
        except Exception as e:
            logger.error(f"通过代理获取RSS失败: {e}")
            # 如果代理失败,尝试不使用代理
            if self.proxies:
                logger.info("尝试不使用代理获取RSS...")
                try:
                    return feedparser.parse(RSS_URL)
                except Exception as e2:
                    logger.error(f"不使用代理获取RSS也失败: {e2}")
            return feedparser.parse("")  # 返回空的feed对象
    
    def handle_message(self, update):
        """处理消息"""
        message = update.get("message", {})
        user_id = message.get("from", {}).get("id")
        text = message.get("text", "").strip()
        
        if not text or not user_id:
            return
        
        logger.info(f"收到消息 from {user_id}: {text}")
        
        if text.startswith("/start"):
            self.handle_start(user_id)
        elif text.startswith("/add"):
            self.handle_add_keyword(user_id, text)
        elif text.startswith("/delete"):
            self.handle_delete_keyword(user_id, text)
        elif text.startswith("/list"):
            self.handle_list_keywords(user_id)
        elif text.startswith("/help"):
            self.handle_help(user_id)
        elif text.startswith("/status"):
            self.handle_status(user_id)
        else:
            self.send_message(user_id, "❓ 未知命令,使用 /help 查看可用命令")
    
    def handle_start(self, user_id):
        """处理开始命令"""
        welcome_text = f"""
🤖 欢迎使用 {BOT_NAME}!

使用方法:
/add 关键词 - 添加监控关键词
/delete 关键词 - 删除关键词
/list - 查看我的关键词列表
/status - 查看机器人状态
/help - 显示帮助信息

当 DeepFlood 社区有新内容匹配你的关键词时,我会立即通知你!
        """
        self.send_message(user_id, welcome_text)
    
    def handle_status(self, user_id):
        """处理状态命令"""
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            # 获取用户的关键词数量
            c.execute("SELECT COUNT(*) FROM user_keywords WHERE user_id = ?", (user_id,))
            keyword_count = c.fetchone()[0]
            
            # 获取总用户数
            c.execute("SELECT COUNT(DISTINCT user_id) FROM user_keywords")
            total_users = c.fetchone()[0]
            
            # 获取总关键词数
            c.execute("SELECT COUNT(*) FROM user_keywords")
            total_keywords = c.fetchone()[0]
            
            status_text = f"""
📊 机器人状态:

👤 你的关键词数量: {keyword_count}
👥 总用户数: {total_users}
🔍 总监控关键词数: {total_keywords}
🌐 代理状态: {'已启用' if self.proxies else '未启用'}
🔄 检查频率: 每10秒一次
            """
            self.send_message(user_id, status_text)
        except Exception as e:
            logger.error(f"获取状态失败: {e}")
            self.send_message(user_id, "❌ 获取状态失败")
        finally:
            conn.close()
    
    def handle_add_keyword(self, user_id, text):
        """处理添加关键词命令"""
        parts = text.split(" ", 1)
        if len(parts) < 2:
            self.send_message(user_id, "❌ 请提供要添加的关键词\n用法: /add 关键词")
            return
        
        keyword = parts[1].strip()
        if not keyword:
            self.send_message(user_id, "❌ 关键词不能为空")
            return
        
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            c.execute("INSERT INTO user_keywords (user_id, keyword) VALUES (?, ?)", (user_id, keyword))
            conn.commit()
            self.send_message(user_id, f"✅ 已添加关键词: {keyword}")
            logger.info(f"用户 {user_id} 添加关键词: {keyword}")
        except sqlite3.IntegrityError:
            self.send_message(user_id, f"⚠️ 关键词 '{keyword}' 已经存在")
        except Exception as e:
            logger.error(f"添加关键词失败: {e}")
            self.send_message(user_id, "❌ 添加关键词失败,请稍后重试")
        finally:
            conn.close()
    
    def handle_delete_keyword(self, user_id, text):
        """处理删除关键词命令"""
        parts = text.split(" ", 1)
        if len(parts) < 2:
            self.send_message(user_id, "❌ 请提供要删除的关键词\n用法: /delete 关键词")
            return
        
        keyword = parts[1].strip()
        if not keyword:
            self.send_message(user_id, "❌ 关键词不能为空")
            return
        
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            c.execute("DELETE FROM user_keywords WHERE user_id = ? AND keyword = ?", (user_id, keyword))
            conn.commit()
            
            if c.rowcount > 0:
                self.send_message(user_id, f"✅ 已删除关键词: {keyword}")
                logger.info(f"用户 {user_id} 删除关键词: {keyword}")
            else:
                self.send_message(user_id, f"❌ 未找到关键词: {keyword}")
        except Exception as e:
            logger.error(f"删除关键词失败: {e}")
            self.send_message(user_id, "❌ 删除关键词失败,请稍后重试")
        finally:
            conn.close()
    
    def handle_list_keywords(self, user_id):
        """处理列出关键词命令"""
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            c.execute("SELECT keyword FROM user_keywords WHERE user_id = ?", (user_id,))
            keywords = [row[0] for row in c.fetchall()]
            
            if keywords:
                keyword_list = "\n".join([f"• {kw}" for kw in keywords])
                self.send_message(user_id, f"📋 你的关键词列表:\n{keyword_list}")
            else:
                self.send_message(user_id, "📝 你还没有添加任何关键词\n使用 /add 关键词 来添加")
        except Exception as e:
            logger.error(f"列出关键词失败: {e}")
            self.send_message(user_id, "❌ 获取关键词列表失败,请稍后重试")
        finally:
            conn.close()
    
    def handle_help(self, user_id):
        """处理帮助命令"""
        help_text = """
📖 使用指南:

🔍 添加关键词:
/add AI
/add 游戏
/add 生活

🗑️ 删除关键词:
/delete AI

📋 查看关键词列表:
/list

📊 查看机器人状态:
/status

⚙️ 机器人会定期检查 DeepFlood 社区的新内容,当发现匹配你关键词的帖子时,会立即推送给你!
        """
        self.send_message(user_id, help_text)
    
    def process_updates(self):
        """处理消息更新"""
        while True:
            try:
                updates = self.get_updates()
                for update in updates:
                    self.last_update_id = update["update_id"]
                    self.handle_message(update)
                time.sleep(1)
            except Exception as e:
                logger.error(f"处理更新时出错: {e}")
                time.sleep(5)
    
    def get_all_user_keywords(self):
        """获取所有用户的关键词"""
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            c.execute("SELECT user_id, keyword FROM user_keywords")
            results = c.fetchall()
            
            # 按用户ID组织关键词
            user_keywords = {}
            for user_id, keyword in results:
                if user_id not in user_keywords:
                    user_keywords[user_id] = []
                user_keywords[user_id].append(keyword)
            
            return user_keywords
        except Exception as e:
            logger.error(f"获取用户关键词失败: {e}")
            return {}
        finally:
            conn.close()
    
    def is_post_sent(self, guid):
        """检查帖子是否已发送"""
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            c.execute("SELECT 1 FROM sent_posts WHERE post_guid = ?", (guid,))
            result = c.fetchone() is not None
            return result
        except Exception as e:
            logger.error(f"检查帖子发送状态失败: {e}")
            return True  # 如果出错,默认已发送,避免重复发送
        finally:
            conn.close()
    
    def mark_post_sent(self, guid):
        """标记帖子为已发送"""
        conn = sqlite3.connect('keywords.db', check_same_thread=False)
        c = conn.cursor()
        
        try:
            c.execute("INSERT OR REPLACE INTO sent_posts (post_guid, sent_time) VALUES (?, ?)", 
                     (guid, datetime.now()))
            conn.commit()
        except Exception as e:
            logger.error(f"标记帖子为已发送失败: {e}")
        finally:
            conn.close()
    
    def check_rss_feeds(self):
        """检查RSS订阅"""
        while True:
            try:
                logger.info("开始检查RSS feeds...")
                # 使用带代理支持的RSS获取方法
                feed = self.get_rss_with_proxy()
                
                if feed.entries:
                    user_keywords = self.get_all_user_keywords()
                    
                    for entry in feed.entries:
                        guid = entry.get('id', entry.get('link', ''))
                        
                        # 检查是否已经发送过
                        if self.is_post_sent(guid):
                            continue
                        
                        # 获取条目内容
                        title = entry.get('title', '')
                        description = entry.get('description', '')
                        link = entry.get('link', '')
                        published = entry.get('published', '')
                        author = entry.get('author', '未知作者')
                        
                        content = f"{title} {description}"
                        
                        # 检查每个用户的关键词
                        for user_id, keywords in user_keywords.items():
                            for keyword in keywords:
                                if keyword.lower() in content.lower():
                                    # 发送通知
                                    message = f"""
🎯 发现匹配内容!

📖 标题:{title}
👤 作者:{author}
📅 发布时间:{published}
🔍 匹配关键词:{keyword}

📝 内容摘要:
{description[:200]}{'...' if len(description) > 200 else ''}

🔗 阅读全文:{link}
                                    """
                                    if self.send_message(user_id, message):
                                        logger.info(f"已向用户 {user_id} 发送匹配内容")
                                    break  # 每个条目只发送一次给用户
                        
                        # 标记为已发送
                        self.mark_post_sent(guid)
                        logger.info(f"已处理新内容: {title}")
                else:
                    logger.info("没有找到新的RSS条目")
                
                # 每10秒检查一次
                time.sleep(10)
                
            except Exception as e:
                logger.error(f"检查RSS时出错: {e}")
                time.sleep(30)  # 出错时等待更长时间
    
    def start(self):
        """启动机器人"""
        logger.info(f"{BOT_NAME} 启动...")
        
        # 测试网络连接
        self.test_connection()
        
        # 启动消息处理线程
        message_thread = threading.Thread(target=self.process_updates, daemon=True)
        message_thread.start()
        
        # 启动RSS检查线程
        rss_thread = threading.Thread(target=self.check_rss_feeds, daemon=True)
        rss_thread.start()
        
        logger.info(f"{BOT_NAME} 启动成功!")
        
        # 主线程保持运行
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("机器人停止")
    
    def test_connection(self):
        """测试网络连接"""
        logger.info("测试网络连接...")
        try:
            # 测试Telegram API连接
            response = requests.get(f"{self.base_url}/getMe", timeout=10, proxies=self.proxies)
            if response.status_code == 200:
                logger.info("✅ Telegram API 连接成功")
            else:
                logger.error(f"❌ Telegram API 连接失败: {response.text}")
            
            # 测试RSS连接
            rss_response = requests.get(RSS_URL, timeout=10, proxies=self.proxies)
            if rss_response.status_code == 200:
                logger.info("✅ RSS 源连接成功")
            else:
                logger.error(f"❌ RSS 源连接失败: {rss_response.status_code}")
                
        except Exception as e:
            logger.error(f"❌ 网络连接测试失败: {e}")

def main():
    # 检查依赖
    try:
        import requests
        import feedparser
        import sqlite3
    except ImportError as e:
        print(f"缺少依赖包: {e}")
        print("请运行: pip install requests feedparser")
        return
    
    # 检查token
    if BOT_TOKEN == "YOUR_BOT_TOKEN_HERE":
        print("❌ 请先设置你的Telegram Bot Token")
        print("1. 在Telegram中联系 @BotFather")
        print("2. 发送 /newbot 创建新机器人")
        print("3. 获取token并替换代码中的 YOUR_BOT_TOKEN_HERE")
        return
    
    # 显示代理设置
    if PROXY:
        print(f"使用代理: {PROXY}")
    else:
        print("不使用代理")
    
    # 启动机器人
    bot = TelegramBot()
    bot.start()

if __name__ == '__main__':
    main()

然后运行就可以!
怎么申请机器人不用我说吧大概...... ac08
这个应该没有自己传命令到telegram菜单的功能
所以请你自己将以下命令手动传输啦~ ac13

/add  - 添加监控关键词
/delete - 删除关键词
/list - 查看我的关键词列表
/status - 查看机器人状态
/help - 显示帮助信息

后记

欢迎各位给个鼓励!有bug及时反馈就行 ac21

1234
1234

你好啊,陌生人!

我的朋友,看起来你是新来的,如果想参与到讨论中,点击下面的按钮!

📈用户数目📈

目前论坛共有43595位seeker

🎉欢迎新用户🎉