admin 管理员组

文章数量: 1184232

本文案例检验多线程理解以及redis和mongodb基本用法,直接掏出代码:

# 架构:多线程爬取与处理,存储过程用redis去重然后存入MongoDB中(协程没用上,我的py3.12不兼容aioredis)
import threading
import requests
from queue import Queue
import re
import hashlib
import redis
import pymongo

lst = []
# 数据抓取与处理
class GetData:
    def __init__(self, url):
        self.url = url
        self.cookies = {
            # 自己登录的cookie
        }

        self.headers = {
            'accept': '*/*',
            'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
            'cache-control': 'no-cache',
            'origin': 'https://search.bilibili',
            'pragma': 'no-cache',
            'priority': 'u=1, i',
            'referer': 'https://search.bilibili/all?keyword=%E8%81%8C%E5%9C%BA+Excel+%E6%8A%80%E8%83%BD%E9%9B%B6%E5%9F%BA%E7%A1%80%E5%85%A5%E9%97%A8&from_source=webtop_search&spm_id_from=333.1007&search_source=3&page=3&o=60',
            'sec-ch-ua': '"Chromium";v="142", "Microsoft Edge";v="142", "Not_A Brand";v="99"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"Windows"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-site',
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0',
            # 'cookie': "buvid3=2B44DC1A-080F-C670-424C-551A296210B556925infoc; b_nut=1756901556; _uuid=4E276336-3A5B-C6E4-A7106-F5791010F2D1E857693infoc; buvid4=4D96BBF6-5D73-41F8-FD7D-CE5A10E586AE57749-025090320-ZO9cCl+QtFzEk1lpG6elJg%3D%3D; buvid_fp=d88839994b1f3d2be5075a05f7de8d26; rpdid=|(u)YJmu|)R|0J'u~lYm|ull); DedeUserID=1272214378; DedeUserID__ckMd5=d6bcf71f18e8c20f; CURRENT_QUALITY=80; theme-tip-show=SHOWED; theme-avatar-tip-show=SHOWED; enable_web_push=DISABLE; home_feed_column=5; bsource=search_bing; browser_resolution=1528-779; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NjUzMzkzODQsImlhdCI6MTc2NTA4MDEyNCwicGx0IjotMX0.03_BGQJxlOuPaRgHVefFeM_d3EG1iGpp3VNEruPNspU; bili_ticket_expires=1765339324; SESSDATA=5fb0e11b%2C1780632184%2C2667c%2Ac2CjCkiWMQ2yTJxN-a4V0dd3FIFw5Wi_lIjHz4j3qj2KICH7B4JG_PM78cLU7DOH15dZoSVkhGdk4tOXYyckdNU0lLaXhIU2lWVnJYdmNoN1oybHNpWUJFT251M3dTY0k1X3psNWlianV4VTBqbzdRdURsMXVSNHljcms2c0FGZnVNaklNODA4NVdRIIEC; bili_jct=34ecd1402df44da134f679d40c5ee972; sid=8irs9yfx; CURRENT_FNVAL=2000; bp_t_offset_1272214378=1143499828123140096; b_lsid=52F26D93_19AF8EA2B33",
        }

        self.p_queue = Queue()
        self.c_queue = Queue()

    # 抓取数据
    def producer(self, key):
        while True:
            if not self.p_queue.empty():
                param_page = self.p_queue.get()
                params = {
                    'category_id': '',
                    'search_type': 'video',
                    'ad_resource': '5654',
                    '__refresh__': 'true',
                    '_extra': '',
                    'context': '',
                    'page': f'{param_page}',
                    'page_size': '42',
                    'pubtime_begin_s': '0',
                    'pubtime_end_s': '0',
                    'from_source': '',
                    'from_spmid': '333.337',
                    'platform': 'pc',
                    'highlight': '1',
                    'single_column': '0',
                    'keyword': key,
                    'qv_id': 'D96cvvwyYwvyw46fu2JOTvQUVpO6PQHb',
                    'source_tag': '3',
                    'gaia_vtoken': '',
                    'dynamic_offset': '60',
                    'web_roll_page': '1',
                    'web_location': '1430654',
                    'w_rid': '18558aec89aecfb87c1ca0ba28533952',
                    'wts': '1765114235',
                }
                response = requests.get(
                    self.url,
                    params=params,
                    cookies=self.cookies,
                    headers=self.headers,
                )
                # print(response.json())
                self.c_queue.put(response.json())

            else:
                print('爬取结束')
                break

    # 处理数据
    def consumer(self):
        # lst = []
        while True:
            if not self.c_queue.empty():
                data = self.c_queue.get()['data']['result']
                # 开始解析数据
                for i in data:
                    # title
                    titles = i['title']
                    title_elems = re.findall(
                        r'<em class="keyword">(.*?)</em>|([^<]+)', titles, re.DOTALL)
                    title_elem_lst = []
                    for elems in title_elems:
                        if elems[0]:
                            title_elem_lst.append(elems[0])
                        elif elems[1]:  # 获取非<em>标签内容
                            title_elem_lst.append(elems[1])
                        elif elems[2]:  # 获取<em>标签内容
                            title_elem_lst.append(elems[2])
                    title = ''.join(title_elem_lst)
                    # url, desc, author, img_url
                    url = i['arcurl']
                    description = i['description']
                    author = i['author']
                    cover_image = 'https:' + i['pic']
                    lst.append({
                        'title': title,
                        'url': url,
                        'description': description,
                        'author': author,
                        'cover_image': cover_image
                    })
            else:
                print('数据解析结束...')
                break
    def main(self):
        input_key = input('请输入想要爬取信息的标题:')
        print('爬虫开始...')
        for page in range(1, 10):
            self.p_queue.put(page)
        # 创建线程
        producer_threading = [threading.Thread(target=self.producer, args=(input_key,)) for _ in range(8)]
        # 开启线程
        for pt in producer_threading:
            pt.start()
        # 主线程等待
        for pt_join in producer_threading:
            pt_join.join()

        # 创建生产者线程
        consumer_threading = [threading.Thread(target=self.consumer) for _ in range(5)]
        for ct in consumer_threading:
            ct.start()

        for ct_join in consumer_threading:
            ct_join.join()

        return input_key

# 数据去重与存储
class SaveData:
    def __init__(self):
        self.client_syn = None
        self.mongo_client = None
        self.key_name = 'B_file'  # 设置键名
        self.unique_lst = []

    def mongo_init(self):
        # 初始化mongo
        self.mongo_client = pymongo.MongoClient()

    # 初始化redis
    def redis_init(self):
        self.client_syn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)


    def del_same_data(self):
        print('开始去重...')
        for class_data in lst:
            md5_url = hashlib.md5(class_data['url'].encode('utf-8')).hexdigest()
            # print(md5_url)
            if not self.client_syn.sismember(self.key_name, md5_url):
                self.unique_lst.append(class_data)
                self.client_syn.sadd(self.key_name, md5_url)
        print('------------------去重结束----------------------')
        print(self.unique_lst)

    # 存入mongo中
    def save_data(self, input_class):
        self.mongo_init()
        print('开始存入mongoDB...')
        # input_class = input('')
        db_client = self.mongo_client['b_class'][f'{input_class}']
        db_client.insert_many(self.unique_lst)
        print('数据存储结束...')

    def main(self, input_key):
        # 初始化redis
        self.redis_init()
        # 去重
        self.del_same_data()
        # 入库
        self.save_data(input_key)

if __name__ == '__main__':
    # 多线程爬取与处理
    get_data = GetData('https://api.bilibili/x/web-interface/wbi/search/type')
    main_spider = get_data.main()
    # print(lst)
    save_data = SaveData()
    save_data.main(main_spider)

    print('程序结束...')

小结

这个代码还不算极限,可以将去重和存储做成异步协程,这样会更快,大家可以试试,加油加油

本文标签: 多线程 信息 视频