# 导入基础模块:JSON处理、正则表达式、系统配置、加密、Base64编解码、URL解析 import json import re import sys import hashlib from base64 import b64decode, b64encode from urllib.parse import urlparse # 导入第三方模块:网络请求、AES加密、HTML解析 import requests from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from pyquery import PyQuery as pq # 添加上级目录到系统路径,用于导入基础爬虫类 sys.path.append('..') from base.spider import Spider as BaseSpider # 图片缓存字典:用于缓存解密后的图片,避免重复请求 img_cache = {} # 自定义爬虫类,继承基础爬虫类BaseSpider class Spider(BaseSpider): # 初始化方法:接收代理配置,设置请求头,获取可用站点域名 def init(self, extend=""): try: # 尝试解析传入的代理配置(JSON格式) self.proxies = json.loads(extend) except: # 解析失败则使用空代理 self.proxies = {} # 设置请求头:模拟Chrome浏览器,避免被反爬 self.headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Cache-Control': 'no-cache', } # 获取可用的站点域名(从备选列表中检测) self.host = self.get_working_host() # 更新请求头:添加Origin和Referer,模拟正常浏览器请求 self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"}) print(f"使用站点: {self.host}") # 获取爬虫名称:用于前端展示 def getName(self): return "🌈 吃瓜网|终极完美版" # 判断URL是否为视频格式:支持m3u8/mp4/ts流媒体格式 def isVideoFormat(self, url): return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts']) # 手动视频检查开关:返回False表示无需手动检查 def manualVideoCheck(self): return False # 销毁方法:清理图片缓存,释放内存 def destroy(self): global img_cache img_cache.clear() # 检测可用的站点域名:遍历备选列表,返回第一个可访问的域名 def get_working_host(self): # 备选站点列表(应对域名失效问题) dynamic_urls = [ 'https://cgw.xwrfsps.cc/', 'https://dlx1w76jjz2r7.cloudfront.net/', 'https://cgw321.com/' ] for url in dynamic_urls: try: # 发送GET请求检测站点是否可用,超时10秒 response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10) if response.status_code == 200: return url except Exception: # 站点不可用则继续检测下一个 continue # 所有站点都不可用时返回第一个备选域名 return dynamic_urls[0] # 获取首页内容:返回分类列表和首页视频列表 def homeContent(self, filter): try: # 发送请求获取首页HTML response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15) if response.status_code != 200: return {'class': [], 'list': []} # 解析HTML内容 data = self.getpq(response.text) # 提取分类列表:尝试多种选择器适配不同页面结构 classes = [] category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li'] for selector in category_selectors: for k in data(selector).items(): link = k('a') href = (link.attr('href') or '').strip() name = (link.text() or '').strip() # 过滤无效分类(空链接/空名称) if not href or href == '#' or not name: continue classes.append({'type_name': name, 'type_id': href}) # 提取到分类后停止遍历选择器 if classes: break # 未提取到分类时使用默认分类(最新/热门) if not classes: classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}] # 返回分类列表和首页视频列表 return {'class': classes, 'list': self.getlist(data('#index article, article'))} except Exception as e: # 异常时返回空数据 return {'class': [], 'list': []} # 获取首页视频内容:仅返回视频列表(简化版homeContent) def homeVideoContent(self): try: response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15) if response.status_code != 200: return {'list': []} data = self.getpq(response.text) return {'list': self.getlist(data('#index article, article'))} except Exception as e: return {'list': []} # 获取分类内容:根据分类ID、页码返回对应视频列表 def categoryContent(self, tid, pg, filter, extend): try: # 处理文件夹类型的分类(特殊标识@folder) if '@folder' in tid: v = self.getfod(tid.replace('@folder', '')) return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)} # 处理页码:确保为整数,默认第1页 pg = int(pg) if pg else 1 # 拼接分类URL:支持完整URL或相对路径 if tid.startswith('http'): base_url = tid.rstrip('/') else: path = tid if tid.startswith('/') else f"/{tid}" base_url = f"{self.host}{path}".rstrip('/') # 拼接分页URL:第1页无页码,后续页码拼接在URL后 if pg == 1: url = f"{base_url}/" else: url = f"{base_url}/{pg}/" # 发送请求获取分类页面 response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15) if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0} # 解析视频列表 data = self.getpq(response.text) videos = self.getlist(data('#archive article, #index article, article'), tid) # 返回视频列表和分页信息(页码/总页数/每页条数/总条数) return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999} except Exception as e: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0} # 获取视频详情:根据视频ID解析播放源和视频信息 def detailContent(self, ids): try: # 拼接详情页URL:支持完整URL或相对路径 url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}" response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15) data = self.getpq(response.text) # 解析播放源列表:格式为"集名$播放链接",多集用#分隔 plist = [] used_names = set() # 优先解析DPlayer播放器(主流视频播放器) if data('.dplayer'): for c, k in enumerate(data('.dplayer').items(), start=1): try: # 解析DPlayer的配置参数(JSON格式) config_attr = k.attr('data-config') if config_attr: config = json.loads(config_attr) video_url = config.get('video', {}).get('url', '') if video_url: # 提取集名:从上级节点的标题中获取 ep_name = '' parent = k.parents().eq(0) for _ in range(4): if not parent: break heading = parent.find('h2, h3, h4').eq(0).text().strip() if heading: ep_name = heading break parent = parent.parents().eq(0) # 处理重复集名:自动添加序号 base_name = ep_name if ep_name else f"视频{c}" name = base_name count = 2 while name in used_names: name = f"{base_name} {count}" count += 1 used_names.add(name) # 添加到播放源列表 plist.append(f"{name}${video_url}") except: continue # 备用解析方式:从文章链接中提取播放源(兼容非DPlayer播放器) if not plist: content_area = data('.post-content, article') for i, link in enumerate(content_area('a').items(), start=1): link_text = link.text().strip() link_href = link.attr('href') # 筛选包含播放关键词的链接 if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']): ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip() if not ep_name: ep_name = f"视频{i}" # 补全相对链接 if not link_href.startswith('http'): link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}" plist.append(f"{ep_name}${link_href}") # 拼接播放源字符串:多集用#分隔 play_url = '#'.join(plist) if plist else f"未找到视频源${url}" # 提取视频标签/描述信息 vod_content = '' try: tags = [] seen_names = set() seen_ids = set() # 提取标签链接 tag_links = data('.tags a, .keywords a, .post-tags a') candidates = [] for k in tag_links.items(): title = k.text().strip() href = k.attr('href') if title and href: candidates.append({'name': title, 'id': href}) # 按标签名称长度降序排序:优先保留长标签(更精准) candidates.sort(key=lambda x: len(x['name']), reverse=True) # 去重并生成标签链接 for item in candidates: name = item['name'] id_ = item['id'] if id_ in seen_ids: continue is_duplicate = False for seen in seen_names: if name in seen: is_duplicate = True break if not is_duplicate: target = json.dumps({'id': id_, 'name': name}) tags.append(f'[a=cr:{target}/]{name}[/a]') seen_names.add(name) seen_ids.add(id_) # 拼接标签字符串,无标签则使用标题 if tags: vod_content = ' '.join(tags) else: vod_content = data('.post-title').text() except Exception: vod_content = '获取标签失败' # 兜底:无标签时使用H1标题 if not vod_content: vod_content = data('h1').text() or '吃瓜网' # 返回视频详情:播放源、标签等 return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': play_url, 'vod_content': vod_content}]} except: # 异常时返回失败信息 return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': '获取失败'}]} # 搜索功能:根据关键词返回搜索结果 def searchContent(self, key, quick, pg="1"): try: # 处理页码 pg = int(pg) if pg else 1 # 拼接搜索URL if pg == 1: url = f"{self.host}/search/{key}/" else: url = f"{self.host}/search/{key}/{pg}/" # 发送搜索请求 response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15) # 解析搜索结果并返回 return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999} except: return {'list': [], 'page': pg, 'pagecount': 9999} # 播放器内容处理:返回视频播放链接和解析方式 def playerContent(self, flag, id, vipFlags): # 判断是否需要解析:视频格式直接播放(parse=0),否则需要解析(parse=1) parse = 0 if self.isVideoFormat(id) else 1 # m3u8格式使用代理,其他格式直接返回链接 url = self.proxy(id) if '.m3u8' in id else id return {'parse': parse, 'url': url, 'header': self.headers} # 本地代理处理:处理图片缓存、图片解密、m3u8/ts流媒体代理 def localProxy(self, param): try: type_ = param.get('type') url = param.get('url') # 缓存类型:返回缓存的图片内容 if type_ == 'cache': key = param.get('key') if content := img_cache.get(key): return [200, 'image/jpeg', content] return [404, 'text/plain', b'Expired'] # 图片类型:解密并返回图片内容 elif type_ == 'img': real_url = self.d64(url) if not url.startswith('http') else url res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10) content = self.aesimg(res.content) return [200, 'image/jpeg', content] # m3u8类型:处理流媒体播放列表,补全TS文件链接 elif type_ == 'm3u8': return self.m3Proxy(url) # 默认处理TS文件代理 else: return self.tsProxy(url) except: return [404, 'text/plain', b''] # 生成代理链接:为流媒体添加代理(支持加密传输) def proxy(self, data, type='m3u8'): if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}" return data # m3u8代理处理:解析播放列表,补全TS文件绝对路径并添加代理 def m3Proxy(self, url): url = self.d64(url) res = requests.get(url, headers=self.headers, proxies=self.proxies) data = res.text # 提取播放列表基础URL(用于补全相对路径) base = res.url.rsplit('/', 1)[0] lines = [] for line in data.split('\n'): # 跳过EXT标记行,仅处理TS文件链接 if '#EXT' not in line and line.strip(): if not line.startswith('http'): line = f"{base}/{line}" # 为TS文件添加代理 lines.append(self.proxy(line, 'ts')) else: lines.append(line) # 返回处理后的m3u8内容 return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)] # TS文件代理:解密并返回TS文件内容 def tsProxy(self, url): return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content] # Base64编码:用于URL安全传输 def e64(self, text): return b64encode(str(text).encode()).decode() # Base64解码:解析编码后的URL def d64(self, text): return b64decode(str(text).encode()).decode() # AES解密图片:处理加密的图片内容(支持CBC/ECB两种模式) def aesimg(self, data): if len(data) < 16: return data # 预设解密密钥对(应对不同加密方式) keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')] for k, v in keys: try: # CBC模式解密 dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16) # 验证解密结果:JPG/PNG图片头标识 if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec except: pass try: # ECB模式解密(备用) dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16) if dec.startswith(b'\xff\xd8'): return dec except: pass # 解密失败返回原数据 return data # 解析视频列表:从HTML元素中提取视频ID、标题、封面、备注等信息 def getlist(self, data, tid=''): videos = [] # 判断是否为文件夹类型分类 is_folder = '/mrdg' in (tid or '') for k in data.items(): # 获取视频卡片完整HTML(用于解析封面) card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k) # 提取视频链接 a = k if k.is_('a') else k('a').eq(0) href = a.attr('href') # 提取视频标题(支持多种选择器) title = k('h2').text() or k('.entry-title').text() or k('.post-title').text() if not title and k.is_('a'): title = k.text() # 过滤无效视频项(无链接/无标题) if href and title: # 提取视频封面图片 img = self.getimg(k('script').text(), k, card_html) videos.append({ 'vod_id': f"{href}{'@folder' if is_folder else ''}", # 视频ID(文件夹类型添加特殊标识) 'vod_name': title.strip(), # 视频名称 'vod_pic': img, # 封面图片链接 'vod_remarks': k('time').text() or '', # 发布时间/备注 'vod_tag': 'folder' if is_folder else '', # 标签(文件夹标识) 'style': {"type": "rect", "ratio": 1.33} # 封面样式 }) return videos # 解析文件夹类型内容:特殊分类的视频列表解析 def getfod(self, id): url = f"{self.host}{id}" data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text) videos = [] # 按H2标题分组解析视频项 for i, h2 in enumerate(data('.post-content h2').items()): p_txt = data('.post-content p').eq(i * 2) # 文本行(含链接) p_img = data('.post-content p').eq(i * 2 + 1)# 图片行(含封面) p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img) videos.append({ 'vod_id': p_txt('a').attr('href'), # 视频链接 'vod_name': p_txt.text().strip(), # 视频名称 'vod_pic': self.getimg('', p_img, p_html), # 封面图片 'vod_remarks': h2.text().strip() # 备注(H2标题) }) return videos # 提取图片链接:支持多种图片存储方式(脚本/直接链接/CSS背景图) def getimg(self, text, elem=None, html_content=None): # 从脚本中提取图片链接(loadBannerDirect函数) if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''): return self._proc_url(m.group(1)) \# 处理HTML内容(优先使用传入的,无则从元素提取) if html_content is None and elem is not None: html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem) if not html_content: return '' \# 转义HTML实体,便于正则匹配 html_content = html_content.replace('"', '"').replace(''', "'").replace('&', '&') \# 匹配base64格式的图片(data:image) if 'data:image' in html_content: m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content) if m: return self._proc_url(m.group(1)) \# 匹配普通图片链接(http/https开头的jpg/png等) m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I) if m: return self._proc_url(m.group(1)) \# 匹配CSS背景图(url()格式) if 'url(' in html_content: m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I) if m: return self._proc_url(m.group(1)) # 无图片时返回空 return '' # 处理图片URL:解密base64图片,添加代理,缓存图片内容 def _proc_url(self, url): if not url: return '' url = url.strip('\'" ') # 处理base64格式图片 if url.startswith('data:'): try: # 分割data:image头和base64内容 _, b64_str = url.split(',', 1) raw = b64decode(b64_str) # 解密加密的图片内容 if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')): raw = self.aesimg(raw) # 生成MD5缓存键,缓存图片内容 key = hashlib.md5(raw).hexdigest() img_cache[key] = raw # 返回缓存代理链接 return f"{self.getProxyUrl()}&type=cache&key={key}" except: return "" # 补全相对路径的图片链接 if not url.startswith('http'): url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}" # 返回图片代理链接(加密传输) return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img" # 封装PyQuery解析:兼容编码问题 def getpq(self, data): try: return pq(data) except: return pq(data.encode('utf-8'))