import os import datetime from django.test import TestCase # Create your tests here. import requests import re import json import subprocess from concurrent.futures import ThreadPoolExecutor def download_video(url): # file_path = 'django3+drf+Vue3前后端分离/' headers = { "Cookie": "buvid3=C6ED37CC-DC0F-D1B0-BA82-231C0731E3C971724infoc; b_nut=1698137871; _uuid=xxxx-9398-E7CA-10B95-xxxx; buvid_fp=xxxx; buvid4=xxxx-93C3-xxxx-xxxx-0F1D34771D4274275-023102416-aL0NYo%xxxx%3D%3D; header_theme_version=CLOSE; DedeUserID=345707270; DedeUserID__ckMd5=7506c67cb7588c20; enable_web_push=ENABLE; iflogin_when_web_push=1; CURRENT_FNVAL=4048; rpdid=|(kYRk|Ruuk)0J'uYm)~JRmml; home_feed_column=5; PVID=1; FEED_LIVE_VERSION=V8; browser_resolution=1920-908; SESSDATA=0aff21e1%2C1729848907%2Ca2f88%2A42CjDHEfsdfE5mZ9GMKVTmTqG3aIO7dew8YUpjK9-z7OXOdBOYjXPi4FVQgJEVacJ0UQkSVk4xTGRnLTEzOHF3TDktYlhEa2JDS3ZFV0FfYjlHZ3ctdzhlWlVDZmhpUFZsMEJCSTZtQkxUU1FiRC1IV1pMenVFV1JxcVhCc2sxNEtCemgyY1dtQVZBIIEC; bili_jct=768662980741f061aedc30f722129d8b; sid=7tqiav60; bp_t_offset_345707270=925256601212813351; b_lsid=DBC104B55_18F27B3DA65; share_source_origin=COPY; bsource=share_source_copy_link; hit-dyn-v2=1; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MTQ2MTczMzMsImlhdCI6MTcxNDM1ODA3MywicGx0IjotMX0.qfWz2oLOuJvDWHCM6Cgwl0SEVjpN6LkOreX8ApoYD4k; bili_ticket_expires=1714617273", "Origin": "https://www.bilibili.com", "Referer": "https://www.bilibili.com/video/BV1ZR4y1U7Qz?p=2", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" } response = requests.get(url, headers=headers) html = response.text # print(response.text)
# 获取视频集的名称作为存放视频的目录 file_path = re.findall(r'data-title="(.*?)" title', html)[0].replace(' ', '') print(file_path) # 检查目录是否存在 if not os.path.exists(file_path): # 如果目录不存在,创建目录 os.makedirs(file_path) print(f"Directory '{file_path}' created successfully.") else: print(f"Directory '{file_path}' already exists.") file_path = file_path + '/'
# 获取每集视频的名称作为文件名 title = re.findall(r'<title data-vue-meta="true">(.*?)_哔哩哔哩_bilibili</title>', html)[0].replace(' ', '') print(title)
# 获取视频信息部分 video_info = re.findall(r'<script>window.__playinfo__=(.*?)</script>', html)[0] print(video_info) json_data = json.loads(video_info)
# 从视频信息部分提取出视频和音频的地址(B站的视频和音频是分开的) video_url = json_data['data']['dash']['video'][0]['baseUrl'] audio_url = json_data['data']['dash']['audio'][0]['baseUrl'] print(video_url) print(audio_url)
# 获取视频和音频的内容并保存成avi和mp3文件 video_content = requests.get(video_url, headers=headers).content audio_content = requests.get(audio_url, headers=headers).content with open(file_path + title + '.avi', 'wb') as video: video.write(video_content) with open(file_path + title + '.mp3', 'wb') as audio: audio.write(audio_content) # 使用ffmpeg工具将视频和音频文件合并成一个文件 cmd = f"ffmpeg -i {file_path}{title}.avi -i {file_path}{title}.mp3 -c:v copy -c:a aac -strict experimental {file_path}{title}.mp4" subprocess.run(cmd, shell=True) os.remove(f'{file_path}{title}.avi') os.remove(f'{file_path}{title}.mp3')
# 多线程的方式爬取(速度要快很多) def main(bvid, start, end): urls = [f'https://www.bilibili.com/video/{bvid}/?p={i}' for i in range(start, end + 1)] print(urls) with ThreadPoolExecutor(max_workers=10) as executor: executor.map(download_video, urls) if __name__ == '__main__':
# 爬取不同的视频合集时,只需要修改bvid的值即可, bvid的值在视频地址中可以获取https://www.bilibili.com/video/BV1Rs4y127j8/?spm_id_from=333.999.0.0&vd_source=6cdcd08f45ddc987f3f46f8ee8f80b9e bvid = 'BV1Sz4y1o7E8' starttime = datetime.datetime.now() print(starttime)
# start和end是开始和结束爬取的集数,如视频集有20集,start为1,end为20 start = 1 end = 56 main(bvid, start, end) endtime = datetime.datetime.now() print(endtime) result_time = endtime - starttime print(result_time)