前言
参考的ns类似的机器人 这个机器人写起来不能说有难度 只能说官方给的条件很成熟 感谢酒神给的rss链接!
如何部署
由于我还没来得及把他扔github上 先把代码贴这里吧
安装下依赖:
pip install python-telegram-bot feedparser apscheduler
然后复制一波我这个代码:
import logging
import sqlite3
import feedparser
import requests
import time
import threading
from datetime import datetime
import os
# 配置日志
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# 机器人配置
BOT_TOKEN = "552212031:srdgkijiejqiowejvi12u3hf" # 请替换为你的机器人token
RSS_URL = "https://feed.deepflood.com/topic.rss.xml"
BOT_NAME = "DeepFlood关键词通知机器人"
# 代理配置 - 修改这里来设置代理
# 格式: "http://用户名:密码@代理地址:端口" 或 "http://代理地址:端口"
# 如果不需要代理,设置为 None 或 ""
PROXY = ""# 示例: "http://127.0.0.1:1080" 或 "http://user:[email protected]:8080"
class TelegramBot:
def __init__(self):
self.token = BOT_TOKEN
self.base_url = f"https://api.telegram.org/bot{self.token}"
self.last_update_id = 0
self.proxies = self.setup_proxy()
self.init_db()
def setup_proxy(self):
"""设置代理"""
if PROXY and PROXY.strip():
logger.info(f"使用代理: {PROXY}")
return {
"http": PROXY,
"https": PROXY
}
else:
logger.info("不使用代理")
return None
def init_db(self):
"""初始化数据库"""
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS user_keywords
(user_id INTEGER, keyword TEXT, UNIQUE(user_id, keyword))
''')
c.execute('''
CREATE TABLE IF NOT EXISTS sent_posts
(post_guid TEXT PRIMARY KEY, sent_time TIMESTAMP)
''')
conn.commit()
conn.close()
logger.info("数据库初始化完成")
def send_message(self, chat_id, text):
"""发送消息到Telegram"""
url = f"{self.base_url}/sendMessage"
data = {
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML"
}
try:
response = requests.post(url, data=data, timeout=10, proxies=self.proxies)
if response.status_code == 200:
return True
else:
logger.error(f"发送消息失败: {response.text}")
return False
except Exception as e:
logger.error(f"发送消息异常: {e}")
return False
def get_updates(self):
"""获取Telegram更新"""
url = f"{self.base_url}/getUpdates"
params = {
"offset": self.last_update_id + 1,
"timeout": 30
}
try:
response = requests.get(url, params=params, timeout=35, proxies=self.proxies)
if response.status_code == 200:
data = response.json()
if data.get("ok"):
return data.get("result", [])
else:
logger.error(f"获取更新失败: {response.text}")
except Exception as e:
logger.error(f"获取更新异常: {e}")
return []
def get_rss_with_proxy(self):
"""使用代理获取RSS内容"""
try:
response = requests.get(RSS_URL, timeout=10, proxies=self.proxies)
if response.status_code == 200:
# 使用feedparser解析响应内容
return feedparser.parse(response.content)
else:
logger.error(f"获取RSS失败,状态码: {response.status_code}")
return feedparser.parse("") # 返回空的feed对象
except Exception as e:
logger.error(f"通过代理获取RSS失败: {e}")
# 如果代理失败,尝试不使用代理
if self.proxies:
logger.info("尝试不使用代理获取RSS...")
try:
return feedparser.parse(RSS_URL)
except Exception as e2:
logger.error(f"不使用代理获取RSS也失败: {e2}")
return feedparser.parse("") # 返回空的feed对象
def handle_message(self, update):
"""处理消息"""
message = update.get("message", {})
user_id = message.get("from", {}).get("id")
text = message.get("text", "").strip()
if not text or not user_id:
return
logger.info(f"收到消息 from {user_id}: {text}")
if text.startswith("/start"):
self.handle_start(user_id)
elif text.startswith("/add"):
self.handle_add_keyword(user_id, text)
elif text.startswith("/delete"):
self.handle_delete_keyword(user_id, text)
elif text.startswith("/list"):
self.handle_list_keywords(user_id)
elif text.startswith("/help"):
self.handle_help(user_id)
elif text.startswith("/status"):
self.handle_status(user_id)
else:
self.send_message(user_id, "❓ 未知命令,使用 /help 查看可用命令")
def handle_start(self, user_id):
"""处理开始命令"""
welcome_text = f"""
🤖 欢迎使用 {BOT_NAME}!
使用方法:
/add 关键词 - 添加监控关键词
/delete 关键词 - 删除关键词
/list - 查看我的关键词列表
/status - 查看机器人状态
/help - 显示帮助信息
当 DeepFlood 社区有新内容匹配你的关键词时,我会立即通知你!
"""
self.send_message(user_id, welcome_text)
def handle_status(self, user_id):
"""处理状态命令"""
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
# 获取用户的关键词数量
c.execute("SELECT COUNT(*) FROM user_keywords WHERE user_id = ?", (user_id,))
keyword_count = c.fetchone()[0]
# 获取总用户数
c.execute("SELECT COUNT(DISTINCT user_id) FROM user_keywords")
total_users = c.fetchone()[0]
# 获取总关键词数
c.execute("SELECT COUNT(*) FROM user_keywords")
total_keywords = c.fetchone()[0]
status_text = f"""
📊 机器人状态:
👤 你的关键词数量: {keyword_count}
👥 总用户数: {total_users}
🔍 总监控关键词数: {total_keywords}
🌐 代理状态: {'已启用' if self.proxies else '未启用'}
🔄 检查频率: 每10秒一次
"""
self.send_message(user_id, status_text)
except Exception as e:
logger.error(f"获取状态失败: {e}")
self.send_message(user_id, "❌ 获取状态失败")
finally:
conn.close()
def handle_add_keyword(self, user_id, text):
"""处理添加关键词命令"""
parts = text.split(" ", 1)
if len(parts) < 2:
self.send_message(user_id, "❌ 请提供要添加的关键词\n用法: /add 关键词")
return
keyword = parts[1].strip()
if not keyword:
self.send_message(user_id, "❌ 关键词不能为空")
return
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
c.execute("INSERT INTO user_keywords (user_id, keyword) VALUES (?, ?)", (user_id, keyword))
conn.commit()
self.send_message(user_id, f"✅ 已添加关键词: {keyword}")
logger.info(f"用户 {user_id} 添加关键词: {keyword}")
except sqlite3.IntegrityError:
self.send_message(user_id, f"⚠️ 关键词 '{keyword}' 已经存在")
except Exception as e:
logger.error(f"添加关键词失败: {e}")
self.send_message(user_id, "❌ 添加关键词失败,请稍后重试")
finally:
conn.close()
def handle_delete_keyword(self, user_id, text):
"""处理删除关键词命令"""
parts = text.split(" ", 1)
if len(parts) < 2:
self.send_message(user_id, "❌ 请提供要删除的关键词\n用法: /delete 关键词")
return
keyword = parts[1].strip()
if not keyword:
self.send_message(user_id, "❌ 关键词不能为空")
return
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
c.execute("DELETE FROM user_keywords WHERE user_id = ? AND keyword = ?", (user_id, keyword))
conn.commit()
if c.rowcount > 0:
self.send_message(user_id, f"✅ 已删除关键词: {keyword}")
logger.info(f"用户 {user_id} 删除关键词: {keyword}")
else:
self.send_message(user_id, f"❌ 未找到关键词: {keyword}")
except Exception as e:
logger.error(f"删除关键词失败: {e}")
self.send_message(user_id, "❌ 删除关键词失败,请稍后重试")
finally:
conn.close()
def handle_list_keywords(self, user_id):
"""处理列出关键词命令"""
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
c.execute("SELECT keyword FROM user_keywords WHERE user_id = ?", (user_id,))
keywords = [row[0] for row in c.fetchall()]
if keywords:
keyword_list = "\n".join([f"• {kw}" for kw in keywords])
self.send_message(user_id, f"📋 你的关键词列表:\n{keyword_list}")
else:
self.send_message(user_id, "📝 你还没有添加任何关键词\n使用 /add 关键词 来添加")
except Exception as e:
logger.error(f"列出关键词失败: {e}")
self.send_message(user_id, "❌ 获取关键词列表失败,请稍后重试")
finally:
conn.close()
def handle_help(self, user_id):
"""处理帮助命令"""
help_text = """
📖 使用指南:
🔍 添加关键词:
/add AI
/add 游戏
/add 生活
🗑️ 删除关键词:
/delete AI
📋 查看关键词列表:
/list
📊 查看机器人状态:
/status
⚙️ 机器人会定期检查 DeepFlood 社区的新内容,当发现匹配你关键词的帖子时,会立即推送给你!
"""
self.send_message(user_id, help_text)
def process_updates(self):
"""处理消息更新"""
while True:
try:
updates = self.get_updates()
for update in updates:
self.last_update_id = update["update_id"]
self.handle_message(update)
time.sleep(1)
except Exception as e:
logger.error(f"处理更新时出错: {e}")
time.sleep(5)
def get_all_user_keywords(self):
"""获取所有用户的关键词"""
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
c.execute("SELECT user_id, keyword FROM user_keywords")
results = c.fetchall()
# 按用户ID组织关键词
user_keywords = {}
for user_id, keyword in results:
if user_id not in user_keywords:
user_keywords[user_id] = []
user_keywords[user_id].append(keyword)
return user_keywords
except Exception as e:
logger.error(f"获取用户关键词失败: {e}")
return {}
finally:
conn.close()
def is_post_sent(self, guid):
"""检查帖子是否已发送"""
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
c.execute("SELECT 1 FROM sent_posts WHERE post_guid = ?", (guid,))
result = c.fetchone() is not None
return result
except Exception as e:
logger.error(f"检查帖子发送状态失败: {e}")
return True # 如果出错,默认已发送,避免重复发送
finally:
conn.close()
def mark_post_sent(self, guid):
"""标记帖子为已发送"""
conn = sqlite3.connect('keywords.db', check_same_thread=False)
c = conn.cursor()
try:
c.execute("INSERT OR REPLACE INTO sent_posts (post_guid, sent_time) VALUES (?, ?)",
(guid, datetime.now()))
conn.commit()
except Exception as e:
logger.error(f"标记帖子为已发送失败: {e}")
finally:
conn.close()
def check_rss_feeds(self):
"""检查RSS订阅"""
while True:
try:
logger.info("开始检查RSS feeds...")
# 使用带代理支持的RSS获取方法
feed = self.get_rss_with_proxy()
if feed.entries:
user_keywords = self.get_all_user_keywords()
for entry in feed.entries:
guid = entry.get('id', entry.get('link', ''))
# 检查是否已经发送过
if self.is_post_sent(guid):
continue
# 获取条目内容
title = entry.get('title', '')
description = entry.get('description', '')
link = entry.get('link', '')
published = entry.get('published', '')
author = entry.get('author', '未知作者')
content = f"{title} {description}"
# 检查每个用户的关键词
for user_id, keywords in user_keywords.items():
for keyword in keywords:
if keyword.lower() in content.lower():
# 发送通知
message = f"""
🎯 发现匹配内容!
📖 标题:{title}
👤 作者:{author}
📅 发布时间:{published}
🔍 匹配关键词:{keyword}
📝 内容摘要:
{description[:200]}{'...' if len(description) > 200 else ''}
🔗 阅读全文:{link}
"""
if self.send_message(user_id, message):
logger.info(f"已向用户 {user_id} 发送匹配内容")
break # 每个条目只发送一次给用户
# 标记为已发送
self.mark_post_sent(guid)
logger.info(f"已处理新内容: {title}")
else:
logger.info("没有找到新的RSS条目")
# 每10秒检查一次
time.sleep(10)
except Exception as e:
logger.error(f"检查RSS时出错: {e}")
time.sleep(30) # 出错时等待更长时间
def start(self):
"""启动机器人"""
logger.info(f"{BOT_NAME} 启动...")
# 测试网络连接
self.test_connection()
# 启动消息处理线程
message_thread = threading.Thread(target=self.process_updates, daemon=True)
message_thread.start()
# 启动RSS检查线程
rss_thread = threading.Thread(target=self.check_rss_feeds, daemon=True)
rss_thread.start()
logger.info(f"{BOT_NAME} 启动成功!")
# 主线程保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("机器人停止")
def test_connection(self):
"""测试网络连接"""
logger.info("测试网络连接...")
try:
# 测试Telegram API连接
response = requests.get(f"{self.base_url}/getMe", timeout=10, proxies=self.proxies)
if response.status_code == 200:
logger.info("✅ Telegram API 连接成功")
else:
logger.error(f"❌ Telegram API 连接失败: {response.text}")
# 测试RSS连接
rss_response = requests.get(RSS_URL, timeout=10, proxies=self.proxies)
if rss_response.status_code == 200:
logger.info("✅ RSS 源连接成功")
else:
logger.error(f"❌ RSS 源连接失败: {rss_response.status_code}")
except Exception as e:
logger.error(f"❌ 网络连接测试失败: {e}")
def main():
# 检查依赖
try:
import requests
import feedparser
import sqlite3
except ImportError as e:
print(f"缺少依赖包: {e}")
print("请运行: pip install requests feedparser")
return
# 检查token
if BOT_TOKEN == "YOUR_BOT_TOKEN_HERE":
print("❌ 请先设置你的Telegram Bot Token")
print("1. 在Telegram中联系 @BotFather")
print("2. 发送 /newbot 创建新机器人")
print("3. 获取token并替换代码中的 YOUR_BOT_TOKEN_HERE")
return
# 显示代理设置
if PROXY:
print(f"使用代理: {PROXY}")
else:
print("不使用代理")
# 启动机器人
bot = TelegramBot()
bot.start()
if __name__ == '__main__':
main()
然后运行就可以!
怎么申请机器人不用我说吧大概...... 
这个应该没有自己传命令到telegram菜单的功能
所以请你自己将以下命令手动传输啦~ 
/add - 添加监控关键词
/delete - 删除关键词
/list - 查看我的关键词列表
/status - 查看机器人状态
/help - 显示帮助信息
后记
欢迎各位给个鼓励!有bug及时反馈就行 
蹲一个docker版
牛 帮顶
@昶- #1 docker我没那技术 这个就已经够简单了 惭愧啊
@昶- #1
https://github.com/Heavrnl/TelegramForwarder 这个更强大哦。
@aessy #5 这个也可以 我这个是专注于df的 就是自己跑着玩的
已docker运行 谢谢楼主
@harper #7 求个镜像😊
@aessy #5 这个厉害,之后就捣鼓捣鼓
@harper #7 求个镜像