【Python爬虫】如何批量获取并下载某博客网站的视频和图片,并根据作者和创作时间分类存储,更新于2025.8.28
分为两种模式:
一种是分后下:【爬取相册按月份分类】后【下载到月份文件夹】。(优点:爬取快,下载全,缺点:无标题信息)。
一种是下后分:【爬取动态页(带标题)】后【下载到单个文件夹】后【可根据时间再分类】。(优点:可下载.mp4和.mov格式,标题信息可以附着到图片,并将日期修改为发布日期,缺点:爬取可能不全)
一、分后下
【爬取相册按月份分类】后【下载到月份文件夹】。(优点:爬取快,下载全,缺点:无标题信息)。
【1.1 Weibo_Home_page_info (long).py】
自动爬相册,效率低但是自动
# 重新整合了结构,加入了网页强制关闭功能,但是下载大量数据后期会很慢
import time
import os
import pandas as pd
import winsound
from selenium.webdriver.edge.options import Options
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
# 打网页
def Open_web(her_web):
# 访问网站
user_data_dir = r"C:UsersxxxxxAppDataLocalMicrosoftEdgeUser Data"
options = Options()
options.add_argument(f"user-data-dir={user_data_dir}")
wb = webdriver.Edge(options=options)
# 输入网址
time.sleep(1)
wb.get(her_web)
print('验证')
input()
return wb
# 采集作者所有作品的ID的函数
def Get_IDs(wb,IDs):
try:
lis = wb.find_elements(By.CSS_SELECTOR, '#app div.woo-box-item-flex > div > div > div > div > img')
for li in lis:
url = li.get_attribute('src')
note_info = {'aid': '', 'title': '', 'url': url}
# 判别重复
if note_info not in IDs:
print(note_info)
IDs.append(note_info)
print('当前列表 =', len(IDs))
except:
print('元素获取失败')
return IDs
# 滚动页面
def Scroll_page(wb,previous_height,is_height):
# 滚动页面直到不再加载新的元素
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1) # 等待页面加载
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1) # 等待页面加载
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1) # 等待页面加载
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(2) # 等待页面加载
# 计算新高度
new_height = wb.execute_script("return document.body.scrollHeight")
# 检查是否到达页面底部
if new_height == previous_height:
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(5) # 等待页面加载
new_height = wb.execute_script("return document.body.scrollHeight")
if new_height == previous_height:
is_height+=1
print('即将结束采集:', is_height)
else:
is_height=0
return is_height
# 储存列表
def save_list(lists, path, name):
# for li in lists:
# print(li)
# 转变工作目录到指定文件夹
os.chdir(path)
# 创建DataFrame对象
df = pd.DataFrame(lists)
# 保存为Excel文件
pd.DataFrame(lists, columns=['aid', "title", "url"]).
to_excel(os.path.join(file_path,name), index=False)
print('列表已保存:', os.path.join(file_path,name))
# 替换掉不能命名的字符
def replace_x(words):
words = words.replace('/', '')
words = words.replace(':', '')
words = words.replace('*', '')
words = words.replace('?', '')
words = words.replace('〈', '')
words = words.replace('〉', '')
words = words.replace('|', '')
return words
# 创建文件夹
def folder_creation(path, gallery_name):
is_exists = os.path.exists(path + gallery_name) # path是文件夹或者文件的相对路径或者绝对路径
if is_exists:
print('文件夹已存在')
else:
print('文件夹不存在,已创建')
os.mkdir(path + gallery_name)
# 获取到列表Video_lists
def Excel_List(file_path, excel_name):
os.chdir(file_path)
df = pd.read_excel(excel_name)
data = df.values.tolist()
Video_lists = []
for video_list in data:
video_dict = {'aid': video_list[0],
'title': video_list[1],
'url': video_list[2]}
Video_lists.append(video_dict)
# print(Video_lists)
print('共收藏了', len(Video_lists), '条记录')
return Video_lists
# 发出提示音
def Sound_beep():
duration = 1000 # millisecond
freq = 440 # Hz
winsound.Beep(freq, duration)
# 主函数
# Windows下强制关闭Edge进程
os.system("taskkill /f /im msedge.exe")
os.system("taskkill /f /im msedgedriver.exe")
# 文件保存位置
file_path = r'D:下载站weibodownload' # 读取列表和下载路径
# 请设置Excel文件名
NvYou = '作者名'
upID=NvYou.split('@')[1]
IDs_Photo = []
# 创建链接
her_web = 'https://weibo.com/u/' + upID + '?tabtype=album'
print('her_web:', her_web)
wb=Open_web(her_web)
# 获取原有界面高度,并初始化判别高度变化is_height
is_height=0
num_excel=0
previous_height = wb.execute_script("return document.body.scrollHeight")
while True:
IDs_Photo = Get_IDs(wb,IDs_Photo)
if len(IDs_Photo) > 100:
num_excel+=1
save_list(IDs_Photo, file_path, NvYou +'_'+str(num_excel)+ '.xlsx')
print(IDs_Photo)
print('共采集到', len(IDs_Photo), '条作品')
IDs_Photo=[]
print('IDs_Photo=', IDs_Photo)
#开始滚动
is_height=Scroll_page(wb, previous_height,is_height)
if is_height == 2: # 如果滚不动了,弹出判别框
is_end = int(input('是否结束采集(1/0)'))
if is_end == 1:
print(IDs_Photo)
print('共采集到', len(IDs_Photo), '条作品')
break
else:
is_height = 0
Sound_beep()
wb.close()
【1.3 Weibo_Home_page_info (time gallery).py】
相册页手动翻到底,因为网页会一直存储所有URLs,效率高但是费手。还有一个好处是程序会更具月份分类存储爬到的urls,这样更新时就不用下载老的月份,推荐!。
# 手动采集,采集的相册页,通过相册前缀标识区分出时间页面,并分包储存(匹配2.3下载)
import time
import os
import pandas as pd
import winsound
from selenium.webdriver.edge.options import Options
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
# 打网页
def Open_web(her_web):
# 访问网站
user_data_dir = r"C:UsersxxxxxxAppDataLocalMicrosoftEdgeUser Data"
options = Options()
options.add_argument(f"user-data-dir={user_data_dir}")
wb = webdriver.Edge(options=options)
# 输入网址
time.sleep(1)
wb.get(her_web)
print('验证')
input()
return wb
# 采集作者所有作品的ID的函数
def Get_IDs(wb,IDs):
try:
pages=wb.find_elements(By.CSS_SELECTOR, '.ProfileAlbum_item_2bkJV')
page_num=0#用于记数网页上所能展示的月份页数
for page in pages:#月份循环
page_num+=1
if page_num>len(pages):#如果是获取到的最后一个日期,那信息一定不全,所以不下载
print('page_num=',page_num,'此页不全执行滚动')
return
IDs=[]#若没有跳出循环,则说明此页内容完整,可采集,清空列表后采集当月信息
time1 = page.find_element(By.CSS_SELECTOR, '.ProfileAlbum_y_2dSeV').text
time2 = page.find_element(By.CSS_SELECTOR, '.ProfileAlbum_m_24O4p').text
if time1=='':
time1='2025'
time=time1+'-'+time2
print('正在下载 :', time)
lis = page.find_elements(By.CSS_SELECTOR, 'img')
for li in lis:#月份内图片元素循环
url = li.get_attribute('src')
note_info = {'aid': '', 'title': '', 'url': url}
# 判别重复
if note_info not in IDs:
print(note_info)
IDs.append(note_info)
print(IDs)
print(time,'共采集到', len(IDs), '条作品')
save_list(IDs, file_path, NvYou +'_'+time+ '.xlsx')
except:
print('元素获取失败')
# 滚动页面
def Scroll_page(wb,previous_height,is_height):
# 滚动页面直到不再加载新的元素
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1) # 等待页面加载
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1) # 等待页面加载
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1) # 等待页面加载
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(2) # 等待页面加载
# 计算新高度
new_height = wb.execute_script("return document.body.scrollHeight")
# 检查是否到达页面底部
if new_height == previous_height:
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(5) # 等待页面加载
new_height = wb.execute_script("return document.body.scrollHeight")
if new_height == previous_height:
is_height+=1
print('即将结束采集:', is_height)
else:
is_height=0
return is_height
# 储存列表
def save_list(lists, path, name):
# for li in lists:
# print(li)
# 转变工作目录到指定文件夹
os.chdir(path)
# 创建DataFrame对象
df = pd.DataFrame(lists)
# 保存为Excel文件
pd.DataFrame(lists, columns=['aid', "title", "url"]).
to_excel(os.path.join(file_path,name), index=False)
print('列表已保存:', os.path.join(file_path,name))
# 替换掉不能命名的字符
def replace_x(words):
words = words.replace('/', '')
words = words.replace(':', '')
words = words.replace('*', '')
words = words.replace('?', '')
words = words.replace('〈', '')
words = words.replace('〉', '')
words = words.replace('|', '')
return words
# 创建文件夹
def folder_creation(path, gallery_name):
is_exists = os.path.exists(path + gallery_name) # path是文件夹或者文件的相对路径或者绝对路径
if is_exists:
print('文件夹已存在')
else:
print('文件夹不存在,已创建')
os.mkdir(path + gallery_name)
# 获取到列表Video_lists
def Excel_List(file_path, excel_name):
os.chdir(file_path)
df = pd.read_excel(excel_name)
data = df.values.tolist()
Video_lists = []
for video_list in data:
video_dict = {'aid': video_list[0],
'title': video_list[1],
'url': video_list[2]}
Video_lists.append(video_dict)
# print(Video_lists)
print('共收藏了', len(Video_lists), '条记录')
return Video_lists
# 发出提示音
def Sound_beep():
duration = 1000 # millisecond
freq = 440 # Hz
winsound.Beep(freq, duration)
# 主函数
# Windows下强制关闭Edge进程
os.system("taskkill /f /im msedge.exe")
os.system("taskkill /f /im msedgedriver.exe")
# 文件保存位置
file_path = r'D:下载站weibodownload' # 读取列表和下载路径
NvYou = '作者名'
upID=NvYou.split('@')[1]
IDs_Photo = []
# 创建链接
her_web = 'https://weibo.com/u/' + upID + '?tabtype=album'
print('her_web:', her_web)
wb=Open_web(her_web)
# 获取原有界面高度,并初始化判别高度变化is_height
is_height=0
previous_height = wb.execute_script("return document.body.scrollHeight")
while True:
#手动开始滚动,快速直接滚到底,微博网页信息不回被覆盖
IDs_Photo = Get_IDs(wb, IDs_Photo)
# #自动开始滚动
# Scroll_page(wb, previous_height, is_height)
Sound_beep()
is_end = int(input('(结束采集按1/或继续滚动后按0)'))
if is_end == 1:
break
wb.close()
【2.3 Weibo_Note_Download (time gallery).py】
分类下载到每个月份子文件夹中
# 博下载:通过分包的Excel直接下载到有时间分类的子文件夹中(版本答案,匹配1.3获取信息)
import glob
import os
import winsound
import pandas as pd
import requests
# 获取收藏夹列表Video_lists
def Excel_List(file_path, name):
# 转变工作目录到指定文件夹
os.chdir(file_path)
# 读取 Excel 文件
df = pd.read_excel(name)
# 转换为列表
data = df.values.tolist()
Note_lists = []
for li in data:
video_dict = {'aid': str(li[0]), # 这里不知道为什么突然变成int类型,导致后面匹配的时候识别不到,所以加了str()
'title': li[1],
'url': li[2]}
# print(video_dict)
Note_lists.append(video_dict)
print('共收藏了', len(Note_lists), '条笔记')
print(Note_lists)
# print(type(Note_lists[156]['aid']))
return Note_lists
# 创建文件夹
def folder_creation(path):
is_exists = os.path.exists(path) # path是文件夹或者文件的相对路径或者绝对路径
if is_exists:
print('文件夹已存在')
else:
print('文件夹不存在,已创建')
os.mkdir(path)
# 通过ID获取响应的函数
def request_header(url):
# 请求url
# print(url)
# 加上一个请求头,伪装成浏览器
headers = {
# cookie:用户信息,登录或不登录都有
'cookie': "你的cookie",
'referer': '任意该网站域名',
# user-agent:浏览器信息,版本,电脑
'user-agent': '你的user-agent'
}
response = requests.get(url, headers=headers)
# print(response.text)
return response, headers
# 替换掉不能命名的字符
def replace_x(words):
words = words.replace('/', '')
words = words.replace(':', '')
words = words.replace('*', '')
words = words.replace('?', '')
words = words.replace('〈', '')
words = words.replace('〉', '')
words = words.replace('>', '')
words = words.replace('<', '')
words = words.replace('|', '')
words = words.replace('"', '')
words = words.replace('...', '')
words = words.replace('
', '')
return words
# 下载并保存图片
def download_images(url, save_path):
try:
# 取文件名
filename = url.split('/')[-1]
save_file = os.path.join(save_path, filename)
# 发送请求(使用已有 headers)
(response, headers) = request_header(url)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
with open(save_file, 'wb') as f:
f.write(response.content)
print(f"下载成功: {filename}")
else:
print(f"下载失败(状态码 {response.status_code}): {filename}")
except Exception as e:
print(f"下载出错: {url},错误: {e}")
# 发出提示音
def Sound_beep():
duration = 1000 # millisecond
freq = 440 # Hz
winsound.Beep(freq, duration)
# 主函数
#获取列表
file_path = r'D:下载站weibo'
NvYou='作者名'
download_path=os.path.join(file_path,NvYou)
folder_creation(download_path)
xlsx_path=os.path.join(file_path,'0download',NvYou)
print(xlsx_path)
# 获取所有 .xlsx 文件的完整路径(不包括临时文件和隐藏文件)
xlsx_files = glob.glob(os.path.join(xlsx_path, '*.xlsx'))
# print(xlsx_files)
# 遍历每一个 Excel 文件
for name in xlsx_files:
#选取文件夹
name=name.split('')[-1]
print('正在下载:',name)
note_list=Excel_List(xlsx_path, name)
# 文件保存位置
time_ID=name.split('_')[-1].replace('月.xlsx','')
print(time_ID)
folder_path=os.path.join(download_path,time_ID)
folder_creation(folder_path)
i=0
for note in note_list:
i+=1
print('正在下载(',i,'/',len(note_list),')')
url=note['url']
url=url.replace('orj360', 'large')
print(url)
# 调用下载函数
download_images(url, folder_path)
Sound_beep()
这种方式下的最全最整齐,但是无法获取到视频,也无法将每条动态的标题赋予下载的图片。
二、下后分
爬取动态页(带标题)】后【下载到单个文件夹】后【可更具时间再分类】。(优点:可下载MP4和mov格式,标题信息可以附着到图片,并将日期修改为发布日期,缺点:爬取可能不全)
【1.5 Weibo_Home_page_info (moment).py】
这里爬取的不是相册而是主页动态,会有一些转发内容,也会被一起下载,如果只想要原创内容可在网页的筛选界面手动选择。
# 自动采集,采集的是动态页,采集不全,需要反复确认,,配合(2.5下载)
import time
import os
import pandas as pd
import winsound
from selenium.webdriver.edge.options import Options
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
# 打网页
def Open_web(her_web):
# 访问网站
user_data_dir = r"C:UsersxxxxxxAppDataLocalMicrosoftEdgeUser Data"
options = Options()
options.add_argument(f"user-data-dir={user_data_dir}")
wb = webdriver.Edge(options=options)
# 输入网址
time.sleep(1)
wb.get(her_web)
print('验证')
input()
return wb
# 采集作者所有作品的ID的函数
def Get_IDs(wb,IDs):
# 从已有 IDs 中提取所有 URL 作为判重列表
url_list = [item['url'] for item in IDs]
try:
lis = wb.find_elements(By.CSS_SELECTOR, '.Feed_body_3R0rO')
for li in lis:
li_time = li.find_element(By.CSS_SELECTOR,
' [class="woo-box-flex woo-box-alignCenter woo-box-justifyCenter head-info_info_2AspQ"] a').get_attribute(
'title')
li_url = li.find_element(By.CSS_SELECTOR,
' [class="woo-box-flex woo-box-alignCenter woo-box-justifyCenter head-info_info_2AspQ"] a').get_attribute(
'href')
# try:
li_title = li.find_element(By.CSS_SELECTOR,'[class="detail_wbtext_4CRf9"]').text
# except:li_title = ''
note_info = {'time': li_time, 'title': li_title, 'url': li_url}
# 判别重复
if note_info['url'] not in url_list:
print(note_info)
IDs.append(note_info)
print('当前列表 =', len(IDs))
except:
print('元素获取失败')
return IDs
# 滚动页面
def Scroll_page(wb, is_height):
# 如果函数属性 previous_height 不存在,就初始化
if not hasattr(Scroll_page, "previous_height"):
Scroll_page.previous_height = wb.execute_script("return document.body.scrollHeight")
# 滚动页面
for _ in range(2):
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(0.1)
time.sleep(2)
# 获取新高度
new_height = wb.execute_script("return document.body.scrollHeight")
if new_height == Scroll_page.previous_height:
wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(5)
new_height = wb.execute_script("return document.body.scrollHeight")
if new_height == Scroll_page.previous_height:
is_height += 1
print('即将结束采集:', is_height)
else:
is_height = 0
else:
is_height = 0
# 更新存储的高度
Scroll_page.previous_height = new_height
return is_height
# 储存列表
def save_list(lists, path, name):
# for li in lists:
# print(li)
# 转变工作目录到指定文件夹
os.chdir(path)
# 创建DataFrame对象
df = pd.DataFrame(lists)
# 保存为Excel文件
pd.DataFrame(lists, columns=['time', "title", "url"]).
to_excel(os.path.join(file_path,name), index=False)
print('列表已保存:', os.path.join(file_path,name))
# 替换掉不能命名的字符
def replace_x(words):
words = words.replace('/', '')
words = words.replace(':', '')
words = words.replace('*', '')
words = words.replace('?', '')
words = words.replace('〈', '')
words = words.replace('〉', '')
words = words.replace('|', '')
return words
# 创建文件夹
def folder_creation(path, gallery_name):
is_exists = os.path.exists(path + gallery_name) # path是文件夹或者文件的相对路径或者绝对路径
if is_exists:
print('文件夹已存在')
else:
print('文件夹不存在,已创建')
os.mkdir(path + gallery_name)
# 获取到列表Video_lists
def Excel_List(file_path, excel_name):
os.chdir(file_path)
df = pd.read_excel(excel_name)
data = df.values.tolist()
Video_lists = []
for video_list in data:
video_dict = {'aid': video_list[0],
'title': video_list[1],
'url': video_list[2]}
Video_lists.append(video_dict)
# print(Video_lists)
print('共收藏了', len(Video_lists), '条记录')
return Video_lists
# 发出提示音
def Sound_beep():
duration = 1000 # millisecond
freq = 440 # Hz
winsound.Beep(freq, duration)
# 主函数
# Windows下强制关闭Edge进程
os.system("taskkill /f /im msedge.exe")
os.system("taskkill /f /im msedgedriver.exe")
# 文件保存位置
file_path = r'D:下载站weibodownload' # 读取列表和下载路径
NvYou = '作者名'
upID=NvYou.split('@')[1]
IDs_Photo = []
# 创建链接
her_web = 'https://weibo.com/u/' + upID
print('her_web:', her_web)
wb=Open_web(her_web)
# 获取原有界面高度,并初始化判别高度变化is_height
is_height=0
while True:
IDs_Photo = Get_IDs(wb, IDs_Photo)
is_height=Scroll_page(wb, is_height)
if is_height == 2: # 如果滚不动了,弹出判别框
Sound_beep()
is_end = int(input('是否结束采集(1/0)'))
if is_end == 1:
print(IDs_Photo)
print('共采集到', len(IDs_Photo), '条作品')
save_list(IDs_Photo, file_path, NvYou + '.xlsx')
break
else:
is_height = 0
wb.close()
【2.5 Weibo_Note_Download (moment).py】
这里下载的内容包括了.jpg, .mp4, .mov,但是全部放在根文件,如果还想按照月份分类的话也可以,因为这里下载下来已经把文件时间修改为了动态的发布时间。
# 微博下载:获取到的是整条微博信息,可能包含若干图片,命名方式以原名称+title形式,并更改其日期
# 导入时间模块
import time
from datetime import datetime
import os
import re
import winsound
import pandas as pd
import requests
from sympy.codegen.ast import continue_
# 获取收藏夹列表Video_lists
def Excel_List(file_path, name):
# 转变工作目录到指定文件夹
os.chdir(file_path)
# 读取 Excel 文件
df = pd.read_excel(name)
# 转换为列表
data = df.values.tolist()
Note_lists = []
for li in data:
video_dict = {'time': str(li[0]), # 这里不知道为什么突然变成int类型,导致后面匹配的时候识别不到,所以加了str()
'title': li[1],
'url': li[2]}
# print(video_dict)
Note_lists.append(video_dict)
print('共收藏了', len(Note_lists), '条笔记')
# print(Note_lists)
return Note_lists
# 创建文件夹
def folder_creation(path):
is_exists = os.path.exists(path) # path是文件夹或者文件的相对路径或者绝对路径
if is_exists:
print('文件夹已存在')
else:
print('文件夹不存在,已创建')
os.mkdir(path)
# 通过ID获取响应的函数
def request_header(url):
# 请求url
# print(url)
# 加上一个请求头,伪装成浏览器
headers = {
# cookie:用户信息,登录或不登录都有
'cookie': "你的cookie",
'referer': '任意该网站网址',
# user-agent:浏览器信息,版本,电脑
'user-agent': '你的user-agent'
}
response = requests.get(url, headers=headers)
# print(response.text)
return response, headers
def find_urls(response):
url_list = []
# 第一步:匹配 pic_ids 方括号内的内容
try:
ids_block = re.findall(r'"pic_ids":s*[([sS]*?)],', response.text, re.S)
# 第二步:从捕获的方括号内容里提取每个 ID
if ids_block: # 确保有匹配结果
ids_list = re.findall(r'"([0-9a-zA-Z]{32})"', ids_block[0])
# 第三步:从捕获的ID中寻找完整的url
for pic_id in ids_list:
url=re.findall(rf'"url":s*"([^"]*large[^"]*{pic_id}[^"]*)"', response.text)[0]
if url not in url_list:
# print(url)
url_list.append(url)
except:
pass
# 第四步:查看是否有视频信息
try:
ids_block=re.findall('"videoSrc":s*"(.*?)"', response.text)
if ids_block:
print('多个视频')
for url in ids_block:
# print(url)
url_list.append(url)
except:
pass
# 第五步:查看是否有多个视频信息
try:
url = re.findall('"mp4_720p_mp4":s*"(.*?)"', response.text)[0]
if url :
print('单个视频')
url_list.append(url)
except:
pass
print('这条微博共采集到',len(url_list),'个域名:',url_list)
return url_list
# 下载并保存图片
def download_images(url, save_path,title):
filename=''
try:
# 取文件名
if '.jpg' in url:
if type(title)==str:
filename = url.split('.jpg')[0].split('/')[-1]+'-'+title+'.jpg'
else:
filename = url.split('/')[-1]
elif 'video' in url:
if 'mov' in url:
filename = url.split('%')[-1].replace('.mov','') +'-'+title+'.mov'
else:
filename = url.split('?')[0].replace('.mp4', '').split('/')[-1] +'-'+title+'.mp4'
# print('正在下载文件:',filename)
save_file = os.path.join(save_path, filename)
# 发送请求(使用已有 headers)
(response, headers) = request_header(url)
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
with open(save_file, 'wb') as f:
f.write(response.content)
print(f"下载成功: {filename}")
else:
print(f"下载失败(状态码 {response.status_code}): {filename}")
except Exception as e:
print(f"下载出错: {url},错误: {e}")
return os.path.join(save_path,filename)
# 修改时间的函数
def modify_time(whole_file_path, release_time):
release_time=release_time+':00'
print(release_time)
a = datetime.strptime(release_time, '%Y-%m-%d %H:%M:%S')
a.timestamp()
os.utime(whole_file_path, (a.timestamp(), a.timestamp())) # 只能修改 访问时间 与 修改时间(不能修改创建时间)
# 替换掉不能命名的字符
def replace_x(filename: str, replacement: str = '_') -> str:
"""
将任意字符串转换为合法的 Windows 文件名
filename: 原始字符串
replacement: 替换非法字符的字符,默认 '_'
"""
# 1. 替换 Windows 不允许的字符
illegal_chars = r'[<>:"/|?*
]'
filename = re.sub(illegal_chars, replacement, filename)
# 2. 替换连续多个替换字符为单个
filename = re.sub(f'{re.escape(replacement)}+', replacement, filename)
# 3. 去掉开头和结尾的空格和点
filename = filename.strip(' .')
# 4. 可选:去掉连续的三个点 '...'
filename = filename.replace('...', replacement)
return filename
# 发出提示音
def Sound_beep():
duration = 1000 # millisecond
freq = 440 # Hz
winsound.Beep(freq, duration)
# 主函数
#获取列表
file_path = r'D:下载站weibo'
NvYou='作者名'
download_path=os.path.join(file_path,NvYou)
folder_creation(download_path)
xlsx_path=os.path.join(file_path,'0download')
note_list=Excel_List(xlsx_path, NvYou+'.xlsx')
# 文件保存位置
i=0
for note in note_list:
i+=1
print('正在下载(',i,'/',len(note_list),')')
url=note['url']
title = note['title']
release_time=note['time']
if type(title)==str:
title=replace_x(title)
# url='https://weibo.com/5532569093/PBvWfstnU' #视频测试
# url='https://weibo.com/2028473200/PFPdSygRA' #多图测试
# url='https://weibo.com/5681691748/PzfeMp3Jr' #多视频测试
# title='长时间哦ID放假哦'
print(url)
#解析url中的response
(response, header)= request_header(url)
#解析url中的图片域名
photo_urls=find_urls(response)
for url in photo_urls:
# 调用下载函数
whole_img_path=download_images(url, download_path,title)
# time.sleep(0.5)
# 更正时间
try:
modify_time(whole_img_path, release_time)
except:
Sound_beep()
print('更新时间失败')
【3.5 Sort out by time (moment).py】
如果还想按时间分类到月份子文件夹,可以执行这个代码
import os
import shutil
from datetime import datetime
# 图片所在文件夹
source_folder = r"D:下载站视频"
# 遍历文件夹中的所有文件
for filename in os.listdir(source_folder):
file_path = os.path.join(source_folder, filename)
# 只处理图片文件
if os.path.isfile(file_path) and filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp','mp4','mov')):
# 获取文件的创建时间(也可以改为修改时间 os.path.getmtime)
# timestamp = os.path.getctime(file_path)
timestamp = os.path.getmtime(file_path)
file_date = datetime.fromtimestamp(timestamp)
# 格式化成 "YYYY-M" 形式
folder_name = f"{file_date.year}-{file_date.month}"
target_folder = os.path.join(source_folder, folder_name)
# 创建子文件夹(如果不存在)
os.makedirs(target_folder, exist_ok=True)
# 移动文件到对应子文件夹
shutil.move(file_path, os.path.join(target_folder, filename))
print("图片分类完成!")
下后分的有点就是更全的文件信息,但是因为爬取动态的过程可能会遗漏部分未显示的动态,所以不是最全的,我一般会把两种方法结合在一起使用,也就是下两遍,至于重复文件,可以通过对比aid来删除。下面介绍。
三、若是两种方法都使用,可对比aid删除重复文件
【4.0 duplicate_Image_compare_aid (all).py】
import os
import hashlib
import shutil
def collect_aid(folder):
aids=[]
for root, _, files in os.walk(folder):
for filename in files:
file_path = os.path.join(root, filename)
aid=filename.split('-')[0]
if os.path.isfile(file_path):
aids.append(aid)
return aids
def move_duplicates_between_folders(folder1, folder2, destination_folder):
"""
递归比较 folder1 和 folder2 中的文件,
如果 folder2 中的文件与 folder1 中的文件相同,则移动到 destination_folder。
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# 收集 folder1 的所有文件哈希
aid_folder1 = collect_aid(folder1)
print(aid_folder1)
# 检查 folder2 的所有文件(倒序处理)
for root, _, files in os.walk(folder2):
for filename in sorted(files, key=lambda x: x.lower(), reverse=True):
file_path = os.path.join(root, filename)
aid2=filename.split('.')[0]
if aid2 in aid_folder1:
# 目标文件路径(保持原文件名)
dest_path = os.path.join(destination_folder, filename)
print(f"发现重复文件:{file_path},移动到 {dest_path}")
shutil.move(file_path, dest_path)
# 路径
path1 = r'D:下载站' # 对照文件夹(递归扫描)
path2 = r'D:中转站' # 待处理文件夹(递归扫描)
path3 = r'C:Users86158Desktop重复图片' # 存放重复文件
# 执行
move_duplicates_between_folders(path1, path2, path3)
















暂无评论内容