admin 管理员组文章数量: 1184232
本文案例检验多线程理解以及redis和mongodb基本用法,直接掏出代码:
# 架构:多线程爬取与处理,存储过程用redis去重然后存入MongoDB中(协程没用上,我的py3.12不兼容aioredis)
import threading
import requests
from queue import Queue
import re
import hashlib
import redis
import pymongo
lst = []
# 数据抓取与处理
class GetData:
def __init__(self, url):
self.url = url
self.cookies = {
# 自己登录的cookie
}
self.headers = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'cache-control': 'no-cache',
'origin': 'https://search.bilibili',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://search.bilibili/all?keyword=%E8%81%8C%E5%9C%BA+Excel+%E6%8A%80%E8%83%BD%E9%9B%B6%E5%9F%BA%E7%A1%80%E5%85%A5%E9%97%A8&from_source=webtop_search&spm_id_from=333.1007&search_source=3&page=3&o=60',
'sec-ch-ua': '"Chromium";v="142", "Microsoft Edge";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0',
# 'cookie': "buvid3=2B44DC1A-080F-C670-424C-551A296210B556925infoc; b_nut=1756901556; _uuid=4E276336-3A5B-C6E4-A7106-F5791010F2D1E857693infoc; buvid4=4D96BBF6-5D73-41F8-FD7D-CE5A10E586AE57749-025090320-ZO9cCl+QtFzEk1lpG6elJg%3D%3D; buvid_fp=d88839994b1f3d2be5075a05f7de8d26; rpdid=|(u)YJmu|)R|0J'u~lYm|ull); DedeUserID=1272214378; DedeUserID__ckMd5=d6bcf71f18e8c20f; CURRENT_QUALITY=80; theme-tip-show=SHOWED; theme-avatar-tip-show=SHOWED; enable_web_push=DISABLE; home_feed_column=5; bsource=search_bing; browser_resolution=1528-779; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NjUzMzkzODQsImlhdCI6MTc2NTA4MDEyNCwicGx0IjotMX0.03_BGQJxlOuPaRgHVefFeM_d3EG1iGpp3VNEruPNspU; bili_ticket_expires=1765339324; SESSDATA=5fb0e11b%2C1780632184%2C2667c%2Ac2CjCkiWMQ2yTJxN-a4V0dd3FIFw5Wi_lIjHz4j3qj2KICH7B4JG_PM78cLU7DOH15dZoSVkhGdk4tOXYyckdNU0lLaXhIU2lWVnJYdmNoN1oybHNpWUJFT251M3dTY0k1X3psNWlianV4VTBqbzdRdURsMXVSNHljcms2c0FGZnVNaklNODA4NVdRIIEC; bili_jct=34ecd1402df44da134f679d40c5ee972; sid=8irs9yfx; CURRENT_FNVAL=2000; bp_t_offset_1272214378=1143499828123140096; b_lsid=52F26D93_19AF8EA2B33",
}
self.p_queue = Queue()
self.c_queue = Queue()
# 抓取数据
def producer(self, key):
while True:
if not self.p_queue.empty():
param_page = self.p_queue.get()
params = {
'category_id': '',
'search_type': 'video',
'ad_resource': '5654',
'__refresh__': 'true',
'_extra': '',
'context': '',
'page': f'{param_page}',
'page_size': '42',
'pubtime_begin_s': '0',
'pubtime_end_s': '0',
'from_source': '',
'from_spmid': '333.337',
'platform': 'pc',
'highlight': '1',
'single_column': '0',
'keyword': key,
'qv_id': 'D96cvvwyYwvyw46fu2JOTvQUVpO6PQHb',
'source_tag': '3',
'gaia_vtoken': '',
'dynamic_offset': '60',
'web_roll_page': '1',
'web_location': '1430654',
'w_rid': '18558aec89aecfb87c1ca0ba28533952',
'wts': '1765114235',
}
response = requests.get(
self.url,
params=params,
cookies=self.cookies,
headers=self.headers,
)
# print(response.json())
self.c_queue.put(response.json())
else:
print('爬取结束')
break
# 处理数据
def consumer(self):
# lst = []
while True:
if not self.c_queue.empty():
data = self.c_queue.get()['data']['result']
# 开始解析数据
for i in data:
# title
titles = i['title']
title_elems = re.findall(
r'<em class="keyword">(.*?)</em>|([^<]+)', titles, re.DOTALL)
title_elem_lst = []
for elems in title_elems:
if elems[0]:
title_elem_lst.append(elems[0])
elif elems[1]: # 获取非<em>标签内容
title_elem_lst.append(elems[1])
elif elems[2]: # 获取<em>标签内容
title_elem_lst.append(elems[2])
title = ''.join(title_elem_lst)
# url, desc, author, img_url
url = i['arcurl']
description = i['description']
author = i['author']
cover_image = 'https:' + i['pic']
lst.append({
'title': title,
'url': url,
'description': description,
'author': author,
'cover_image': cover_image
})
else:
print('数据解析结束...')
break
def main(self):
input_key = input('请输入想要爬取信息的标题:')
print('爬虫开始...')
for page in range(1, 10):
self.p_queue.put(page)
# 创建线程
producer_threading = [threading.Thread(target=self.producer, args=(input_key,)) for _ in range(8)]
# 开启线程
for pt in producer_threading:
pt.start()
# 主线程等待
for pt_join in producer_threading:
pt_join.join()
# 创建生产者线程
consumer_threading = [threading.Thread(target=self.consumer) for _ in range(5)]
for ct in consumer_threading:
ct.start()
for ct_join in consumer_threading:
ct_join.join()
return input_key
# 数据去重与存储
class SaveData:
def __init__(self):
self.client_syn = None
self.mongo_client = None
self.key_name = 'B_file' # 设置键名
self.unique_lst = []
def mongo_init(self):
# 初始化mongo
self.mongo_client = pymongo.MongoClient()
# 初始化redis
def redis_init(self):
self.client_syn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
def del_same_data(self):
print('开始去重...')
for class_data in lst:
md5_url = hashlib.md5(class_data['url'].encode('utf-8')).hexdigest()
# print(md5_url)
if not self.client_syn.sismember(self.key_name, md5_url):
self.unique_lst.append(class_data)
self.client_syn.sadd(self.key_name, md5_url)
print('------------------去重结束----------------------')
print(self.unique_lst)
# 存入mongo中
def save_data(self, input_class):
self.mongo_init()
print('开始存入mongoDB...')
# input_class = input('')
db_client = self.mongo_client['b_class'][f'{input_class}']
db_client.insert_many(self.unique_lst)
print('数据存储结束...')
def main(self, input_key):
# 初始化redis
self.redis_init()
# 去重
self.del_same_data()
# 入库
self.save_data(input_key)
if __name__ == '__main__':
# 多线程爬取与处理
get_data = GetData('https://api.bilibili/x/web-interface/wbi/search/type')
main_spider = get_data.main()
# print(lst)
save_data = SaveData()
save_data.main(main_spider)
print('程序结束...')
小结
这个代码还不算极限,可以将去重和存储做成异步协程,这样会更快,大家可以试试,加油加油版权声明:本文标题:04-多线程爬取B站视频信息 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1767823536a3508434.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论