Files
files/lib/吃瓜网2.py
2026-03-21 13:07:08 +08:00

503 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 导入基础模块JSON处理、正则表达式、系统配置、加密、Base64编解码、URL解析
import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
# 导入第三方模块网络请求、AES加密、HTML解析
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
# 添加上级目录到系统路径,用于导入基础爬虫类
sys.path.append('..')
from base.spider import Spider as BaseSpider
# 图片缓存字典:用于缓存解密后的图片,避免重复请求
img_cache = {}
# 自定义爬虫类继承基础爬虫类BaseSpider
class Spider(BaseSpider):
# 初始化方法:接收代理配置,设置请求头,获取可用站点域名
def init(self, extend=""):
try:
# 尝试解析传入的代理配置JSON格式
self.proxies = json.loads(extend)
except:
# 解析失败则使用空代理
self.proxies = {}
# 设置请求头模拟Chrome浏览器避免被反爬
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
# 获取可用的站点域名(从备选列表中检测)
self.host = self.get_working_host()
# 更新请求头添加Origin和Referer模拟正常浏览器请求
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
# 获取爬虫名称:用于前端展示
def getName(self):
return "🌈 吃瓜网|终极完美版"
# 判断URL是否为视频格式支持m3u8/mp4/ts流媒体格式
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
# 手动视频检查开关返回False表示无需手动检查
def manualVideoCheck(self):
return False
# 销毁方法:清理图片缓存,释放内存
def destroy(self):
global img_cache
img_cache.clear()
# 检测可用的站点域名:遍历备选列表,返回第一个可访问的域名
def get_working_host(self):
# 备选站点列表(应对域名失效问题)
dynamic_urls = [
'https://cgw.xwrfsps.cc/',
'https://dlx1w76jjz2r7.cloudfront.net/',
'https://cgw321.com/'
]
for url in dynamic_urls:
try:
# 发送GET请求检测站点是否可用超时10秒
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
# 站点不可用则继续检测下一个
continue
# 所有站点都不可用时返回第一个备选域名
return dynamic_urls[0]
# 获取首页内容:返回分类列表和首页视频列表
def homeContent(self, filter):
try:
# 发送请求获取首页HTML
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
# 解析HTML内容
data = self.getpq(response.text)
# 提取分类列表:尝试多种选择器适配不同页面结构
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
# 过滤无效分类(空链接/空名称)
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
# 提取到分类后停止遍历选择器
if classes: break
# 未提取到分类时使用默认分类(最新/热门)
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
# 返回分类列表和首页视频列表
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
# 异常时返回空数据
return {'class': [], 'list': []}
# 获取首页视频内容仅返回视频列表简化版homeContent
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
# 获取分类内容根据分类ID、页码返回对应视频列表
def categoryContent(self, tid, pg, filter, extend):
try:
# 处理文件夹类型的分类(特殊标识@folder
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
# 处理页码确保为整数默认第1页
pg = int(pg) if pg else 1
# 拼接分类URL支持完整URL或相对路径
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
# 拼接分页URL第1页无页码后续页码拼接在URL后
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
# 发送请求获取分类页面
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
# 解析视频列表
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
# 返回视频列表和分页信息(页码/总页数/每页条数/总条数)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
# 获取视频详情根据视频ID解析播放源和视频信息
def detailContent(self, ids):
try:
# 拼接详情页URL支持完整URL或相对路径
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
# 解析播放源列表:格式为"集名$播放链接",多集用#分隔
plist = []
used_names = set()
# 优先解析DPlayer播放器主流视频播放器
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
# 解析DPlayer的配置参数JSON格式
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
# 提取集名:从上级节点的标题中获取
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
# 处理重复集名:自动添加序号
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
# 添加到播放源列表
plist.append(f"{name}${video_url}")
except: continue
# 备用解析方式从文章链接中提取播放源兼容非DPlayer播放器
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
# 筛选包含播放关键词的链接
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
# 补全相对链接
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
# 拼接播放源字符串:多集用#分隔
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
# 提取视频标签/描述信息
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
# 提取标签链接
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
# 按标签名称长度降序排序:优先保留长标签(更精准)
candidates.sort(key=lambda x: len(x['name']), reverse=True)
# 去重并生成标签链接
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
# 拼接标签字符串,无标签则使用标题
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
# 兜底无标签时使用H1标题
if not vod_content:
vod_content = data('h1').text() or '吃瓜网'
# 返回视频详情:播放源、标签等
return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
# 异常时返回失败信息
return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': '获取失败'}]}
# 搜索功能:根据关键词返回搜索结果
def searchContent(self, key, quick, pg="1"):
try:
# 处理页码
pg = int(pg) if pg else 1
# 拼接搜索URL
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
# 发送搜索请求
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
# 解析搜索结果并返回
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
# 播放器内容处理:返回视频播放链接和解析方式
def playerContent(self, flag, id, vipFlags):
# 判断是否需要解析视频格式直接播放parse=0否则需要解析parse=1
parse = 0 if self.isVideoFormat(id) else 1
# m3u8格式使用代理其他格式直接返回链接
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
# 本地代理处理处理图片缓存、图片解密、m3u8/ts流媒体代理
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
# 缓存类型:返回缓存的图片内容
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
# 图片类型:解密并返回图片内容
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
# m3u8类型处理流媒体播放列表补全TS文件链接
elif type_ == 'm3u8':
return self.m3Proxy(url)
# 默认处理TS文件代理
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
# 生成代理链接:为流媒体添加代理(支持加密传输)
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
# m3u8代理处理解析播放列表补全TS文件绝对路径并添加代理
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
# 提取播放列表基础URL用于补全相对路径
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
# 跳过EXT标记行仅处理TS文件链接
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
# 为TS文件添加代理
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
# 返回处理后的m3u8内容
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
# TS文件代理解密并返回TS文件内容
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
# Base64编码用于URL安全传输
def e64(self, text):
return b64encode(str(text).encode()).decode()
# Base64解码解析编码后的URL
def d64(self, text):
return b64decode(str(text).encode()).decode()
# AES解密图片处理加密的图片内容支持CBC/ECB两种模式
def aesimg(self, data):
if len(data) < 16: return data
# 预设解密密钥对(应对不同加密方式)
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
# CBC模式解密
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
# 验证解密结果JPG/PNG图片头标识
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
# ECB模式解密备用
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
# 解密失败返回原数据
return data
# 解析视频列表从HTML元素中提取视频ID、标题、封面、备注等信息
def getlist(self, data, tid=''):
videos = []
# 判断是否为文件夹类型分类
is_folder = '/mrdg' in (tid or '')
for k in data.items():
# 获取视频卡片完整HTML用于解析封面
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
# 提取视频链接
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
# 提取视频标题(支持多种选择器)
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
# 过滤无效视频项(无链接/无标题)
if href and title:
# 提取视频封面图片
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}", # 视频ID文件夹类型添加特殊标识
'vod_name': title.strip(), # 视频名称
'vod_pic': img, # 封面图片链接
'vod_remarks': k('time').text() or '', # 发布时间/备注
'vod_tag': 'folder' if is_folder else '', # 标签(文件夹标识)
'style': {"type": "rect", "ratio": 1.33} # 封面样式
})
return videos
# 解析文件夹类型内容:特殊分类的视频列表解析
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
# 按H2标题分组解析视频项
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2) # 文本行(含链接)
p_img = data('.post-content p').eq(i * 2 + 1)# 图片行(含封面)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'), # 视频链接
'vod_name': p_txt.text().strip(), # 视频名称
'vod_pic': self.getimg('', p_img, p_html), # 封面图片
'vod_remarks': h2.text().strip() # 备注H2标题
})
return videos
# 提取图片链接:支持多种图片存储方式(脚本/直接链接/CSS背景图
def getimg(self, text, elem=None, html_content=None):
# 从脚本中提取图片链接loadBannerDirect函数
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
\# 处理HTML内容优先使用传入的无则从元素提取
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
\# 转义HTML实体便于正则匹配
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
\# 匹配base64格式的图片data:image
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
\# 匹配普通图片链接http/https开头的jpg/png等
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
\# 匹配CSS背景图url()格式)
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
# 无图片时返回空
return ''
# 处理图片URL解密base64图片添加代理缓存图片内容
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
# 处理base64格式图片
if url.startswith('data:'):
try:
# 分割data:image头和base64内容
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
# 解密加密的图片内容
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
# 生成MD5缓存键缓存图片内容
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
# 返回缓存代理链接
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
# 补全相对路径的图片链接
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
# 返回图片代理链接(加密传输)
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
# 封装PyQuery解析兼容编码问题
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))