上传文件至 lib

This commit is contained in:
2026-03-21 13:07:08 +08:00
parent bbd45e8a51
commit 426b3252c2

503
lib/吃瓜网2.py Normal file
View File

@@ -0,0 +1,503 @@
# 导入基础模块JSON处理、正则表达式、系统配置、加密、Base64编解码、URL解析
import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
# 导入第三方模块网络请求、AES加密、HTML解析
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
# 添加上级目录到系统路径,用于导入基础爬虫类
sys.path.append('..')
from base.spider import Spider as BaseSpider
# 图片缓存字典:用于缓存解密后的图片,避免重复请求
img_cache = {}
# 自定义爬虫类继承基础爬虫类BaseSpider
class Spider(BaseSpider):
# 初始化方法:接收代理配置,设置请求头,获取可用站点域名
def init(self, extend=""):
try:
# 尝试解析传入的代理配置JSON格式
self.proxies = json.loads(extend)
except:
# 解析失败则使用空代理
self.proxies = {}
# 设置请求头模拟Chrome浏览器避免被反爬
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
# 获取可用的站点域名(从备选列表中检测)
self.host = self.get_working_host()
# 更新请求头添加Origin和Referer模拟正常浏览器请求
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
# 获取爬虫名称:用于前端展示
def getName(self):
return "🌈 吃瓜网|终极完美版"
# 判断URL是否为视频格式支持m3u8/mp4/ts流媒体格式
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
# 手动视频检查开关返回False表示无需手动检查
def manualVideoCheck(self):
return False
# 销毁方法:清理图片缓存,释放内存
def destroy(self):
global img_cache
img_cache.clear()
# 检测可用的站点域名:遍历备选列表,返回第一个可访问的域名
def get_working_host(self):
# 备选站点列表(应对域名失效问题)
dynamic_urls = [
'https://cgw.xwrfsps.cc/',
'https://dlx1w76jjz2r7.cloudfront.net/',
'https://cgw321.com/'
]
for url in dynamic_urls:
try:
# 发送GET请求检测站点是否可用超时10秒
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
# 站点不可用则继续检测下一个
continue
# 所有站点都不可用时返回第一个备选域名
return dynamic_urls[0]
# 获取首页内容:返回分类列表和首页视频列表
def homeContent(self, filter):
try:
# 发送请求获取首页HTML
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
# 解析HTML内容
data = self.getpq(response.text)
# 提取分类列表:尝试多种选择器适配不同页面结构
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
# 过滤无效分类(空链接/空名称)
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
# 提取到分类后停止遍历选择器
if classes: break
# 未提取到分类时使用默认分类(最新/热门)
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
# 返回分类列表和首页视频列表
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
# 异常时返回空数据
return {'class': [], 'list': []}
# 获取首页视频内容仅返回视频列表简化版homeContent
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
# 获取分类内容根据分类ID、页码返回对应视频列表
def categoryContent(self, tid, pg, filter, extend):
try:
# 处理文件夹类型的分类(特殊标识@folder
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
# 处理页码确保为整数默认第1页
pg = int(pg) if pg else 1
# 拼接分类URL支持完整URL或相对路径
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
# 拼接分页URL第1页无页码后续页码拼接在URL后
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
# 发送请求获取分类页面
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
# 解析视频列表
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
# 返回视频列表和分页信息(页码/总页数/每页条数/总条数)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
# 获取视频详情根据视频ID解析播放源和视频信息
def detailContent(self, ids):
try:
# 拼接详情页URL支持完整URL或相对路径
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
# 解析播放源列表:格式为"集名$播放链接",多集用#分隔
plist = []
used_names = set()
# 优先解析DPlayer播放器主流视频播放器
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
# 解析DPlayer的配置参数JSON格式
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
# 提取集名:从上级节点的标题中获取
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
# 处理重复集名:自动添加序号
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
# 添加到播放源列表
plist.append(f"{name}${video_url}")
except: continue
# 备用解析方式从文章链接中提取播放源兼容非DPlayer播放器
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
# 筛选包含播放关键词的链接
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
# 补全相对链接
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
# 拼接播放源字符串:多集用#分隔
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
# 提取视频标签/描述信息
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
# 提取标签链接
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
# 按标签名称长度降序排序:优先保留长标签(更精准)
candidates.sort(key=lambda x: len(x['name']), reverse=True)
# 去重并生成标签链接
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
# 拼接标签字符串,无标签则使用标题
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
# 兜底无标签时使用H1标题
if not vod_content:
vod_content = data('h1').text() or '吃瓜网'
# 返回视频详情:播放源、标签等
return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
# 异常时返回失败信息
return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': '获取失败'}]}
# 搜索功能:根据关键词返回搜索结果
def searchContent(self, key, quick, pg="1"):
try:
# 处理页码
pg = int(pg) if pg else 1
# 拼接搜索URL
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
# 发送搜索请求
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
# 解析搜索结果并返回
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
# 播放器内容处理:返回视频播放链接和解析方式
def playerContent(self, flag, id, vipFlags):
# 判断是否需要解析视频格式直接播放parse=0否则需要解析parse=1
parse = 0 if self.isVideoFormat(id) else 1
# m3u8格式使用代理其他格式直接返回链接
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
# 本地代理处理处理图片缓存、图片解密、m3u8/ts流媒体代理
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
# 缓存类型:返回缓存的图片内容
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
# 图片类型:解密并返回图片内容
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
# m3u8类型处理流媒体播放列表补全TS文件链接
elif type_ == 'm3u8':
return self.m3Proxy(url)
# 默认处理TS文件代理
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
# 生成代理链接:为流媒体添加代理(支持加密传输)
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
# m3u8代理处理解析播放列表补全TS文件绝对路径并添加代理
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
# 提取播放列表基础URL用于补全相对路径
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
# 跳过EXT标记行仅处理TS文件链接
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
# 为TS文件添加代理
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
# 返回处理后的m3u8内容
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
# TS文件代理解密并返回TS文件内容
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
# Base64编码用于URL安全传输
def e64(self, text):
return b64encode(str(text).encode()).decode()
# Base64解码解析编码后的URL
def d64(self, text):
return b64decode(str(text).encode()).decode()
# AES解密图片处理加密的图片内容支持CBC/ECB两种模式
def aesimg(self, data):
if len(data) < 16: return data
# 预设解密密钥对(应对不同加密方式)
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
# CBC模式解密
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
# 验证解密结果JPG/PNG图片头标识
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
# ECB模式解密备用
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
# 解密失败返回原数据
return data
# 解析视频列表从HTML元素中提取视频ID、标题、封面、备注等信息
def getlist(self, data, tid=''):
videos = []
# 判断是否为文件夹类型分类
is_folder = '/mrdg' in (tid or '')
for k in data.items():
# 获取视频卡片完整HTML用于解析封面
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
# 提取视频链接
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
# 提取视频标题(支持多种选择器)
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
# 过滤无效视频项(无链接/无标题)
if href and title:
# 提取视频封面图片
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}", # 视频ID文件夹类型添加特殊标识
'vod_name': title.strip(), # 视频名称
'vod_pic': img, # 封面图片链接
'vod_remarks': k('time').text() or '', # 发布时间/备注
'vod_tag': 'folder' if is_folder else '', # 标签(文件夹标识)
'style': {"type": "rect", "ratio": 1.33} # 封面样式
})
return videos
# 解析文件夹类型内容:特殊分类的视频列表解析
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
# 按H2标题分组解析视频项
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2) # 文本行(含链接)
p_img = data('.post-content p').eq(i * 2 + 1)# 图片行(含封面)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'), # 视频链接
'vod_name': p_txt.text().strip(), # 视频名称
'vod_pic': self.getimg('', p_img, p_html), # 封面图片
'vod_remarks': h2.text().strip() # 备注H2标题
})
return videos
# 提取图片链接:支持多种图片存储方式(脚本/直接链接/CSS背景图
def getimg(self, text, elem=None, html_content=None):
# 从脚本中提取图片链接loadBannerDirect函数
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
\# 处理HTML内容优先使用传入的无则从元素提取
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
\# 转义HTML实体便于正则匹配
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
\# 匹配base64格式的图片data:image
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
\# 匹配普通图片链接http/https开头的jpg/png等
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
\# 匹配CSS背景图url()格式)
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
# 无图片时返回空
return ''
# 处理图片URL解密base64图片添加代理缓存图片内容
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
# 处理base64格式图片
if url.startswith('data:'):
try:
# 分割data:image头和base64内容
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
# 解密加密的图片内容
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
# 生成MD5缓存键缓存图片内容
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
# 返回缓存代理链接
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
# 补全相对路径的图片链接
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
# 返回图片代理链接(加密传输)
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
# 封装PyQuery解析兼容编码问题
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))