【Python爬虫】如何批量获取并下载某博客网站的视频和图片,并根据作者和创作时间分类存储,更新于2025.8.28

【Python爬虫】如何批量获取并下载某博客网站的视频和图片,并根据作者和创作时间分类存储,更新于2025.8.28

分为两种模式:

一种是分后下:【爬取相册按月份分类】后【下载到月份文件夹】。(优点:爬取快,下载全,缺点:无标题信息)。

一种是下后分:【爬取动态页(带标题)】后【下载到单个文件夹】后【可根据时间再分类】。(优点:可下载.mp4和.mov格式,标题信息可以附着到图片,并将日期修改为发布日期,缺点:爬取可能不全)

一、分后下

【爬取相册按月份分类】后【下载到月份文件夹】。(优点:爬取快,下载全,缺点:无标题信息)。

【1.1 Weibo_Home_page_info (long).py】

自动爬相册,效率低但是自动



# 重新整合了结构,加入了网页强制关闭功能,但是下载大量数据后期会很慢
 
import time
import os
import pandas as pd
import winsound
from selenium.webdriver.edge.options import Options
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
 
 
# 打网页
def Open_web(her_web):
    # 访问网站
    user_data_dir = r"C:UsersxxxxxAppDataLocalMicrosoftEdgeUser Data"
    options = Options()
    options.add_argument(f"user-data-dir={user_data_dir}")
    wb = webdriver.Edge(options=options)
    # 输入网址
    time.sleep(1)
    wb.get(her_web)
    print('验证')
    input()
    return wb
 
# 采集作者所有作品的ID的函数
def Get_IDs(wb,IDs):
    try:
        lis = wb.find_elements(By.CSS_SELECTOR, '#app  div.woo-box-item-flex > div > div > div > div > img')
        for li in lis:
            url = li.get_attribute('src')
            note_info = {'aid': '', 'title': '', 'url': url}
 
            # 判别重复
            if note_info not in IDs:
                print(note_info)
                IDs.append(note_info)
 
        print('当前列表 =', len(IDs))
    except:
        print('元素获取失败')
 
    return IDs
 
# 滚动页面
def Scroll_page(wb,previous_height,is_height):
    # 滚动页面直到不再加载新的元素
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(0.1)  # 等待页面加载
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(0.1)  # 等待页面加载
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(0.1)  # 等待页面加载
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(2)  # 等待页面加载
    # 计算新高度
    new_height = wb.execute_script("return document.body.scrollHeight")
 
    # 检查是否到达页面底部
    if new_height == previous_height:
        wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
        time.sleep(5)  # 等待页面加载
        new_height = wb.execute_script("return document.body.scrollHeight")
        if new_height == previous_height:
            is_height+=1
            print('即将结束采集:', is_height)
        else:
            is_height=0
 
    return is_height
 
 
# 储存列表
def save_list(lists, path, name):
    # for li in lists:
    #     print(li)
    # 转变工作目录到指定文件夹
    os.chdir(path)
    # 创建DataFrame对象
    df = pd.DataFrame(lists)
    # 保存为Excel文件
    pd.DataFrame(lists, columns=['aid', "title", "url"]). 
        to_excel(os.path.join(file_path,name), index=False)
    print('列表已保存:', os.path.join(file_path,name))
 
 
# 替换掉不能命名的字符
def replace_x(words):
    words = words.replace('/', '')
    words = words.replace(':', '')
    words = words.replace('*', '')
    words = words.replace('?', '')
    words = words.replace('〈', '')
    words = words.replace('〉', '')
    words = words.replace('|', '')
    return words
 
# 创建文件夹
def folder_creation(path, gallery_name):
    is_exists = os.path.exists(path + gallery_name)  # path是文件夹或者文件的相对路径或者绝对路径
    if is_exists:
        print('文件夹已存在')
 
    else:
        print('文件夹不存在,已创建')
        os.mkdir(path + gallery_name)
 
# 获取到列表Video_lists
def Excel_List(file_path, excel_name):
    os.chdir(file_path)
    df = pd.read_excel(excel_name)
    data = df.values.tolist()
    Video_lists = []
    for video_list in data:
        video_dict = {'aid': video_list[0],
                      'title': video_list[1],
                      'url': video_list[2]}
        Video_lists.append(video_dict)
    # print(Video_lists)
    print('共收藏了', len(Video_lists), '条记录')
    return Video_lists
 
# 发出提示音
def Sound_beep():
    duration = 1000  # millisecond
    freq = 440  # Hz
    winsound.Beep(freq, duration)
 
# 主函数
# Windows下强制关闭Edge进程
os.system("taskkill /f /im msedge.exe")
os.system("taskkill /f /im msedgedriver.exe")
 
# 文件保存位置
file_path = r'D:下载站weibodownload'  # 读取列表和下载路径
 
# 请设置Excel文件名
NvYou = '作者名'
upID=NvYou.split('@')[1]
 
IDs_Photo = []
 
# 创建链接
her_web = 'https://weibo.com/u/' + upID + '?tabtype=album'
print('her_web:', her_web)
wb=Open_web(her_web)
 
# 获取原有界面高度,并初始化判别高度变化is_height
is_height=0
num_excel=0
previous_height = wb.execute_script("return document.body.scrollHeight")
while True:
    IDs_Photo = Get_IDs(wb,IDs_Photo)
 
    if len(IDs_Photo) > 100:
        num_excel+=1
        save_list(IDs_Photo, file_path, NvYou +'_'+str(num_excel)+ '.xlsx')
        print(IDs_Photo)
        print('共采集到', len(IDs_Photo), '条作品')
        IDs_Photo=[]
        print('IDs_Photo=', IDs_Photo)
 
    #开始滚动
    is_height=Scroll_page(wb, previous_height,is_height)
    if is_height == 2:  # 如果滚不动了,弹出判别框
        is_end = int(input('是否结束采集(1/0)'))
        if is_end == 1:
            print(IDs_Photo)
            print('共采集到', len(IDs_Photo), '条作品')
            break
        else:
            is_height = 0
 
 
Sound_beep()
wb.close()

【1.3 Weibo_Home_page_info (time gallery).py】

相册页手动翻到底,因为网页会一直存储所有URLs,效率高但是费手。还有一个好处是程序会更具月份分类存储爬到的urls,这样更新时就不用下载老的月份,推荐!。



# 手动采集,采集的相册页,通过相册前缀标识区分出时间页面,并分包储存(匹配2.3下载)
 
import time
import os
import pandas as pd
import winsound
from selenium.webdriver.edge.options import Options
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
 
# 打网页
def Open_web(her_web):
    # 访问网站
    user_data_dir = r"C:UsersxxxxxxAppDataLocalMicrosoftEdgeUser Data"
    options = Options()
    options.add_argument(f"user-data-dir={user_data_dir}")
    wb = webdriver.Edge(options=options)
    # 输入网址
    time.sleep(1)
    wb.get(her_web)
    print('验证')
    input()
    return wb
 
# 采集作者所有作品的ID的函数
def Get_IDs(wb,IDs):
    try:
        pages=wb.find_elements(By.CSS_SELECTOR, '.ProfileAlbum_item_2bkJV')
 
        page_num=0#用于记数网页上所能展示的月份页数
        for page in pages:#月份循环
            page_num+=1
 
            if page_num>len(pages):#如果是获取到的最后一个日期,那信息一定不全,所以不下载
                print('page_num=',page_num,'此页不全执行滚动')
                return
 
            IDs=[]#若没有跳出循环,则说明此页内容完整,可采集,清空列表后采集当月信息
            time1 = page.find_element(By.CSS_SELECTOR, '.ProfileAlbum_y_2dSeV').text
            time2 = page.find_element(By.CSS_SELECTOR, '.ProfileAlbum_m_24O4p').text
            if time1=='':
                time1='2025'
            time=time1+'-'+time2
            print('正在下载 :', time)
 
            lis = page.find_elements(By.CSS_SELECTOR, 'img')
 
            for li in lis:#月份内图片元素循环
                url = li.get_attribute('src')
                note_info = {'aid': '', 'title': '', 'url': url}
 
                # 判别重复
                if note_info not in IDs:
                    print(note_info)
                    IDs.append(note_info)
 
            print(IDs)
            print(time,'共采集到', len(IDs), '条作品')
            save_list(IDs, file_path, NvYou +'_'+time+ '.xlsx')
    except:
        print('元素获取失败')
 
 
 
# 滚动页面
def Scroll_page(wb,previous_height,is_height):
    # 滚动页面直到不再加载新的元素
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(0.1)  # 等待页面加载
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(0.1)  # 等待页面加载
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(0.1)  # 等待页面加载
    wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
    time.sleep(2)  # 等待页面加载
    # 计算新高度
    new_height = wb.execute_script("return document.body.scrollHeight")
 
    # 检查是否到达页面底部
    if new_height == previous_height:
        wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
        time.sleep(5)  # 等待页面加载
        new_height = wb.execute_script("return document.body.scrollHeight")
        if new_height == previous_height:
            is_height+=1
            print('即将结束采集:', is_height)
        else:
            is_height=0
 
    return is_height
 
 
 
# 储存列表
def save_list(lists, path, name):
    # for li in lists:
    #     print(li)
    # 转变工作目录到指定文件夹
    os.chdir(path)
    # 创建DataFrame对象
    df = pd.DataFrame(lists)
    # 保存为Excel文件
    pd.DataFrame(lists, columns=['aid', "title", "url"]). 
        to_excel(os.path.join(file_path,name), index=False)
    print('列表已保存:', os.path.join(file_path,name))
 
 
# 替换掉不能命名的字符
def replace_x(words):
    words = words.replace('/', '')
    words = words.replace(':', '')
    words = words.replace('*', '')
    words = words.replace('?', '')
    words = words.replace('〈', '')
    words = words.replace('〉', '')
    words = words.replace('|', '')
    return words
 
# 创建文件夹
def folder_creation(path, gallery_name):
    is_exists = os.path.exists(path + gallery_name)  # path是文件夹或者文件的相对路径或者绝对路径
    if is_exists:
        print('文件夹已存在')
 
    else:
        print('文件夹不存在,已创建')
        os.mkdir(path + gallery_name)
 
# 获取到列表Video_lists
def Excel_List(file_path, excel_name):
    os.chdir(file_path)
    df = pd.read_excel(excel_name)
    data = df.values.tolist()
    Video_lists = []
    for video_list in data:
        video_dict = {'aid': video_list[0],
                      'title': video_list[1],
                      'url': video_list[2]}
        Video_lists.append(video_dict)
    # print(Video_lists)
    print('共收藏了', len(Video_lists), '条记录')
    return Video_lists
 
# 发出提示音
def Sound_beep():
    duration = 1000  # millisecond
    freq = 440  # Hz
    winsound.Beep(freq, duration)
 
# 主函数
# Windows下强制关闭Edge进程
os.system("taskkill /f /im msedge.exe")
os.system("taskkill /f /im msedgedriver.exe")
 
# 文件保存位置
file_path = r'D:下载站weibodownload'  # 读取列表和下载路径
 
 
 
NvYou = '作者名'
 
upID=NvYou.split('@')[1]
 
IDs_Photo = []
 
# 创建链接
her_web = 'https://weibo.com/u/' + upID + '?tabtype=album'
print('her_web:', her_web)
wb=Open_web(her_web)
 
# 获取原有界面高度,并初始化判别高度变化is_height
is_height=0
 
previous_height = wb.execute_script("return document.body.scrollHeight")
while True:
    #手动开始滚动,快速直接滚到底,微博网页信息不回被覆盖
    IDs_Photo = Get_IDs(wb, IDs_Photo)
 
    # #自动开始滚动
    # Scroll_page(wb, previous_height, is_height)
 
 
    Sound_beep()
    is_end = int(input('(结束采集按1/或继续滚动后按0)'))
    if is_end == 1:
        break
 
wb.close()

【2.3 Weibo_Note_Download (time gallery).py】

分类下载到每个月份子文件夹中



# 博下载:通过分包的Excel直接下载到有时间分类的子文件夹中(版本答案,匹配1.3获取信息)
import glob
import os
import winsound
import pandas as pd
import requests
 
 
 
# 获取收藏夹列表Video_lists
def Excel_List(file_path, name):
    # 转变工作目录到指定文件夹
    os.chdir(file_path)
    # 读取 Excel 文件
    df = pd.read_excel(name)
    # 转换为列表
    data = df.values.tolist()
    Note_lists = []
    for li in data:
        video_dict = {'aid': str(li[0]),  # 这里不知道为什么突然变成int类型,导致后面匹配的时候识别不到,所以加了str()
                      'title': li[1],
                      'url': li[2]}
        # print(video_dict)
        Note_lists.append(video_dict)
 
    print('共收藏了', len(Note_lists), '条笔记')
    print(Note_lists)
    # print(type(Note_lists[156]['aid']))
    return Note_lists
 
 
 
# 创建文件夹
def folder_creation(path):
    is_exists = os.path.exists(path)  # path是文件夹或者文件的相对路径或者绝对路径
    if is_exists:
        print('文件夹已存在')
 
    else:
        print('文件夹不存在,已创建')
        os.mkdir(path)
 
 
# 通过ID获取响应的函数
def request_header(url):
    # 请求url
    # print(url)
    # 加上一个请求头,伪装成浏览器
    headers = {
        # cookie:用户信息,登录或不登录都有
        'cookie': "你的cookie",
        'referer': '任意该网站域名',
        # user-agent:浏览器信息,版本,电脑
        'user-agent': '你的user-agent'
 
    }
    response = requests.get(url, headers=headers)
    # print(response.text)
    return response, headers
 
# 替换掉不能命名的字符
def replace_x(words):
    words = words.replace('/', '')
    words = words.replace(':', '')
    words = words.replace('*', '')
    words = words.replace('?', '')
    words = words.replace('〈', '')
    words = words.replace('〉', '')
    words = words.replace('>', '')
    words = words.replace('<', '')
    words = words.replace('|', '')
    words = words.replace('"', '')
    words = words.replace('...', '')
    words = words.replace('
', '')
    return words
 
# 下载并保存图片
def download_images(url, save_path):
    try:
        # 取文件名
        filename = url.split('/')[-1]
        save_file = os.path.join(save_path, filename)
 
        # 发送请求(使用已有 headers)
        (response, headers) = request_header(url)
        response = requests.get(url, headers=headers, timeout=10)
 
        if response.status_code == 200:
            with open(save_file, 'wb') as f:
                f.write(response.content)
            print(f"下载成功: {filename}")
        else:
            print(f"下载失败(状态码 {response.status_code}): {filename}")
 
    except Exception as e:
        print(f"下载出错: {url},错误: {e}")
 
 
# 发出提示音
def Sound_beep():
    duration = 1000  # millisecond
    freq = 440  # Hz
    winsound.Beep(freq, duration)
 
 
# 主函数
#获取列表
file_path = r'D:下载站weibo'
NvYou='作者名'
 
 
download_path=os.path.join(file_path,NvYou)
folder_creation(download_path)
xlsx_path=os.path.join(file_path,'0download',NvYou)
print(xlsx_path)
 
# 获取所有 .xlsx 文件的完整路径(不包括临时文件和隐藏文件)
xlsx_files = glob.glob(os.path.join(xlsx_path, '*.xlsx'))
# print(xlsx_files)
# 遍历每一个 Excel 文件
for name in xlsx_files:
 
    #选取文件夹
    name=name.split('')[-1]
    print('正在下载:',name)
    note_list=Excel_List(xlsx_path, name)
 
 
    # 文件保存位置
    time_ID=name.split('_')[-1].replace('月.xlsx','')
    print(time_ID)
    folder_path=os.path.join(download_path,time_ID)
    folder_creation(folder_path)
 
    i=0
    for note in note_list:
        i+=1
        print('正在下载(',i,'/',len(note_list),')')
 
        url=note['url']
        url=url.replace('orj360', 'large')
        print(url)
 
        # 调用下载函数
        download_images(url, folder_path)
 
Sound_beep()
 
 
 
 
 
 
 
 
 

这种方式下的最全最整齐,但是无法获取到视频,也无法将每条动态的标题赋予下载的图片。

二、下后分

爬取动态页(带标题)】后【下载到单个文件夹】后【可更具时间再分类】。(优点:可下载MP4和mov格式,标题信息可以附着到图片,并将日期修改为发布日期,缺点:爬取可能不全)

【1.5 Weibo_Home_page_info (moment).py】

这里爬取的不是相册而是主页动态,会有一些转发内容,也会被一起下载,如果只想要原创内容可在网页的筛选界面手动选择。



# 自动采集,采集的是动态页,采集不全,需要反复确认,,配合(2.5下载)
 
import time
import os
import pandas as pd
import winsound
from selenium.webdriver.edge.options import Options
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
 
# 打网页
def Open_web(her_web):
    # 访问网站
    user_data_dir = r"C:UsersxxxxxxAppDataLocalMicrosoftEdgeUser Data"
    options = Options()
    options.add_argument(f"user-data-dir={user_data_dir}")
    wb = webdriver.Edge(options=options)
    # 输入网址
    time.sleep(1)
    wb.get(her_web)
    print('验证')
    input()
    return wb
 
# 采集作者所有作品的ID的函数
def Get_IDs(wb,IDs):
 
    # 从已有 IDs 中提取所有 URL 作为判重列表
    url_list = [item['url'] for item in IDs]
 
    try:
        lis = wb.find_elements(By.CSS_SELECTOR, '.Feed_body_3R0rO')
 
        for li in lis:
            li_time = li.find_element(By.CSS_SELECTOR,
                                      ' [class="woo-box-flex woo-box-alignCenter woo-box-justifyCenter head-info_info_2AspQ"] a').get_attribute(
                'title')
            li_url = li.find_element(By.CSS_SELECTOR,
                                     ' [class="woo-box-flex woo-box-alignCenter woo-box-justifyCenter head-info_info_2AspQ"] a').get_attribute(
                'href')
 
            # try:
            li_title = li.find_element(By.CSS_SELECTOR,'[class="detail_wbtext_4CRf9"]').text
            # except:li_title = ''
 
            note_info = {'time': li_time, 'title': li_title, 'url': li_url}
 
            # 判别重复
            if note_info['url'] not in url_list:
                print(note_info)
                IDs.append(note_info)
 
        print('当前列表 =', len(IDs))
    except:
        print('元素获取失败')
 
    return IDs
 
# 滚动页面
def Scroll_page(wb, is_height):
    # 如果函数属性 previous_height 不存在,就初始化
    if not hasattr(Scroll_page, "previous_height"):
        Scroll_page.previous_height = wb.execute_script("return document.body.scrollHeight")
 
    # 滚动页面
    for _ in range(2):
        wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
        time.sleep(0.1)
    time.sleep(2)
 
    # 获取新高度
    new_height = wb.execute_script("return document.body.scrollHeight")
 
    if new_height == Scroll_page.previous_height:
        wb.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
        time.sleep(5)
        new_height = wb.execute_script("return document.body.scrollHeight")
        if new_height == Scroll_page.previous_height:
            is_height += 1
            print('即将结束采集:', is_height)
        else:
            is_height = 0
    else:
        is_height = 0
 
    # 更新存储的高度
    Scroll_page.previous_height = new_height
 
    return is_height
 
 
 
# 储存列表
def save_list(lists, path, name):
    # for li in lists:
    #     print(li)
    # 转变工作目录到指定文件夹
    os.chdir(path)
    # 创建DataFrame对象
    df = pd.DataFrame(lists)
    # 保存为Excel文件
    pd.DataFrame(lists, columns=['time', "title", "url"]). 
        to_excel(os.path.join(file_path,name), index=False)
    print('列表已保存:', os.path.join(file_path,name))
 
 
# 替换掉不能命名的字符
def replace_x(words):
    words = words.replace('/', '')
    words = words.replace(':', '')
    words = words.replace('*', '')
    words = words.replace('?', '')
    words = words.replace('〈', '')
    words = words.replace('〉', '')
    words = words.replace('|', '')
    return words
 
# 创建文件夹
def folder_creation(path, gallery_name):
    is_exists = os.path.exists(path + gallery_name)  # path是文件夹或者文件的相对路径或者绝对路径
    if is_exists:
        print('文件夹已存在')
 
    else:
        print('文件夹不存在,已创建')
        os.mkdir(path + gallery_name)
 
# 获取到列表Video_lists
def Excel_List(file_path, excel_name):
    os.chdir(file_path)
    df = pd.read_excel(excel_name)
    data = df.values.tolist()
    Video_lists = []
    for video_list in data:
        video_dict = {'aid': video_list[0],
                      'title': video_list[1],
                      'url': video_list[2]}
        Video_lists.append(video_dict)
    # print(Video_lists)
    print('共收藏了', len(Video_lists), '条记录')
    return Video_lists
 
# 发出提示音
def Sound_beep():
    duration = 1000  # millisecond
    freq = 440  # Hz
    winsound.Beep(freq, duration)
 
# 主函数
# Windows下强制关闭Edge进程
os.system("taskkill /f /im msedge.exe")
os.system("taskkill /f /im msedgedriver.exe")
 
# 文件保存位置
file_path = r'D:下载站weibodownload'  # 读取列表和下载路径
 
 
NvYou = '作者名'
upID=NvYou.split('@')[1]
 
IDs_Photo = []
 
# 创建链接
her_web = 'https://weibo.com/u/' + upID
print('her_web:', her_web)
wb=Open_web(her_web)
 
# 获取原有界面高度,并初始化判别高度变化is_height
is_height=0
while True:
    IDs_Photo = Get_IDs(wb, IDs_Photo)
    is_height=Scroll_page(wb, is_height)
 
    if is_height == 2:  # 如果滚不动了,弹出判别框
        Sound_beep()
        is_end = int(input('是否结束采集(1/0)'))
        if is_end == 1:
            print(IDs_Photo)
            print('共采集到', len(IDs_Photo), '条作品')
            save_list(IDs_Photo, file_path, NvYou + '.xlsx')
            break
        else:
            is_height = 0
 
wb.close()

【2.5 Weibo_Note_Download (moment).py】

这里下载的内容包括了.jpg, .mp4, .mov,但是全部放在根文件,如果还想按照月份分类的话也可以,因为这里下载下来已经把文件时间修改为了动态的发布时间。



# 微博下载:获取到的是整条微博信息,可能包含若干图片,命名方式以原名称+title形式,并更改其日期
# 导入时间模块
import time
from datetime import datetime
import os
 
import re
 
import winsound
import pandas as pd
import requests
from sympy.codegen.ast import continue_
 
 
# 获取收藏夹列表Video_lists
def Excel_List(file_path, name):
    # 转变工作目录到指定文件夹
    os.chdir(file_path)
    # 读取 Excel 文件
    df = pd.read_excel(name)
    # 转换为列表
    data = df.values.tolist()
    Note_lists = []
    for li in data:
        video_dict = {'time': str(li[0]),  # 这里不知道为什么突然变成int类型,导致后面匹配的时候识别不到,所以加了str()
                      'title': li[1],
                      'url': li[2]}
        # print(video_dict)
        Note_lists.append(video_dict)
 
    print('共收藏了', len(Note_lists), '条笔记')
    # print(Note_lists)
    return Note_lists
 
 
 
# 创建文件夹
def folder_creation(path):
    is_exists = os.path.exists(path)  # path是文件夹或者文件的相对路径或者绝对路径
    if is_exists:
        print('文件夹已存在')
 
    else:
        print('文件夹不存在,已创建')
        os.mkdir(path)
 
 
# 通过ID获取响应的函数
def request_header(url):
    # 请求url
    # print(url)
    # 加上一个请求头,伪装成浏览器
    headers = {
        # cookie:用户信息,登录或不登录都有
        'cookie': "你的cookie",
        'referer': '任意该网站网址',
        # user-agent:浏览器信息,版本,电脑
        'user-agent': '你的user-agent'
 
    }
    response = requests.get(url, headers=headers)
    # print(response.text)
    return response, headers
 
 
 
def find_urls(response):
    url_list = []
    # 第一步:匹配 pic_ids 方括号内的内容
    try:
        ids_block = re.findall(r'"pic_ids":s*[([sS]*?)],', response.text, re.S)
        # 第二步:从捕获的方括号内容里提取每个 ID
        if ids_block:  # 确保有匹配结果
            ids_list = re.findall(r'"([0-9a-zA-Z]{32})"', ids_block[0])
        # 第三步:从捕获的ID中寻找完整的url
        for pic_id in ids_list:
            url=re.findall(rf'"url":s*"([^"]*large[^"]*{pic_id}[^"]*)"', response.text)[0]
            if url not in url_list:
                # print(url)
                url_list.append(url)
    except:
        pass
 
    # 第四步:查看是否有视频信息
    try:
        ids_block=re.findall('"videoSrc":s*"(.*?)"', response.text)
        if ids_block:
            print('多个视频')
        for url in ids_block:
            # print(url)
            url_list.append(url)
    except:
        pass
 
    # 第五步:查看是否有多个视频信息
    try:
        url = re.findall('"mp4_720p_mp4":s*"(.*?)"', response.text)[0]
        if url :
            print('单个视频')
        url_list.append(url)
    except:
        pass
 
    print('这条微博共采集到',len(url_list),'个域名:',url_list)
    return url_list
 
# 下载并保存图片
def download_images(url, save_path,title):
    filename=''
    try:
        # 取文件名
        if '.jpg' in url:
            if type(title)==str:
                filename = url.split('.jpg')[0].split('/')[-1]+'-'+title+'.jpg'
            else:
                filename = url.split('/')[-1]
 
        elif 'video' in url:
            if 'mov' in url:
                filename = url.split('%')[-1].replace('.mov','') +'-'+title+'.mov'
            else:
                filename = url.split('?')[0].replace('.mp4', '').split('/')[-1] +'-'+title+'.mp4'
        # print('正在下载文件:',filename)
        save_file = os.path.join(save_path, filename)
 
        # 发送请求(使用已有 headers)
        (response, headers) = request_header(url)
        response = requests.get(url, headers=headers, timeout=10)
 
        if response.status_code == 200:
            with open(save_file, 'wb') as f:
                f.write(response.content)
            print(f"下载成功: {filename}")
        else:
            print(f"下载失败(状态码 {response.status_code}): {filename}")
 
 
    except Exception as e:
        print(f"下载出错: {url},错误: {e}")
 
    return os.path.join(save_path,filename)
 
 
# 修改时间的函数
def modify_time(whole_file_path, release_time):
    release_time=release_time+':00'
    print(release_time)
    a = datetime.strptime(release_time, '%Y-%m-%d %H:%M:%S')
    a.timestamp()
    os.utime(whole_file_path, (a.timestamp(), a.timestamp()))  # 只能修改 访问时间 与 修改时间(不能修改创建时间)
 
# 替换掉不能命名的字符
def replace_x(filename: str, replacement: str = '_') -> str:
    """
    将任意字符串转换为合法的 Windows 文件名

    filename: 原始字符串
    replacement: 替换非法字符的字符,默认 '_'
    """
    # 1. 替换 Windows 不允许的字符
    illegal_chars = r'[<>:"/|?*

	]'
    filename = re.sub(illegal_chars, replacement, filename)
 
    # 2. 替换连续多个替换字符为单个
    filename = re.sub(f'{re.escape(replacement)}+', replacement, filename)
 
    # 3. 去掉开头和结尾的空格和点
    filename = filename.strip(' .')
 
    # 4. 可选:去掉连续的三个点 '...'
    filename = filename.replace('...', replacement)
 
    return filename
 
 
# 发出提示音
def Sound_beep():
    duration = 1000  # millisecond
    freq = 440  # Hz
    winsound.Beep(freq, duration)
 
 
# 主函数
#获取列表
file_path = r'D:下载站weibo'
NvYou='作者名'
download_path=os.path.join(file_path,NvYou)
folder_creation(download_path)
xlsx_path=os.path.join(file_path,'0download')
note_list=Excel_List(xlsx_path, NvYou+'.xlsx')
 
 
# 文件保存位置
i=0
for note in note_list:
    i+=1
    print('正在下载(',i,'/',len(note_list),')')
    url=note['url']
    title = note['title']
    release_time=note['time']
    if type(title)==str:
        title=replace_x(title)
    # url='https://weibo.com/5532569093/PBvWfstnU'    #视频测试
    # url='https://weibo.com/2028473200/PFPdSygRA'    #多图测试
    # url='https://weibo.com/5681691748/PzfeMp3Jr'    #多视频测试
    # title='长时间哦ID放假哦'
    print(url)
 
    #解析url中的response
    (response, header)= request_header(url)
 
    #解析url中的图片域名
    photo_urls=find_urls(response)
 
    for url in photo_urls:
        # 调用下载函数
        whole_img_path=download_images(url, download_path,title)
        # time.sleep(0.5)
        # 更正时间
        try:
            modify_time(whole_img_path, release_time)
        except:
            Sound_beep()
            print('更新时间失败')
 
 
 
 
 
 
 
 
 
 
 
 

【3.5 Sort out by time (moment).py】

如果还想按时间分类到月份子文件夹,可以执行这个代码



import os
import shutil
from datetime import datetime
 
# 图片所在文件夹
source_folder = r"D:下载站视频"
 
# 遍历文件夹中的所有文件
for filename in os.listdir(source_folder):
    file_path = os.path.join(source_folder, filename)
 
    # 只处理图片文件
    if os.path.isfile(file_path) and filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp','mp4','mov')):
        # 获取文件的创建时间(也可以改为修改时间 os.path.getmtime)
        # timestamp = os.path.getctime(file_path)
        timestamp = os.path.getmtime(file_path)
        file_date = datetime.fromtimestamp(timestamp)
 
        # 格式化成 "YYYY-M" 形式
        folder_name = f"{file_date.year}-{file_date.month}"
        target_folder = os.path.join(source_folder, folder_name)
 
        # 创建子文件夹(如果不存在)
        os.makedirs(target_folder, exist_ok=True)
 
        # 移动文件到对应子文件夹
        shutil.move(file_path, os.path.join(target_folder, filename))
 
print("图片分类完成!")

下后分的有点就是更全的文件信息,但是因为爬取动态的过程可能会遗漏部分未显示的动态,所以不是最全的,我一般会把两种方法结合在一起使用,也就是下两遍,至于重复文件,可以通过对比aid来删除。下面介绍。

三、若是两种方法都使用,可对比aid删除重复文件

【4.0 duplicate_Image_compare_aid (all).py】



import os
import hashlib
import shutil
 
 
def collect_aid(folder):
    aids=[]
    for root, _, files in os.walk(folder):
        for filename in files:
            file_path = os.path.join(root, filename)
            aid=filename.split('-')[0]
            if os.path.isfile(file_path):
                aids.append(aid)
    return aids
 
def move_duplicates_between_folders(folder1, folder2, destination_folder):
    """
    递归比较 folder1 和 folder2 中的文件,
    如果 folder2 中的文件与 folder1 中的文件相同,则移动到 destination_folder。
    """
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)
 
    # 收集 folder1 的所有文件哈希
    aid_folder1 = collect_aid(folder1)
    print(aid_folder1)
 
    # 检查 folder2 的所有文件(倒序处理)
    for root, _, files in os.walk(folder2):
        for filename in sorted(files, key=lambda x: x.lower(), reverse=True):
            file_path = os.path.join(root, filename)
            aid2=filename.split('.')[0]
            if aid2 in aid_folder1:
                # 目标文件路径(保持原文件名)
                dest_path = os.path.join(destination_folder, filename)
                print(f"发现重复文件:{file_path},移动到 {dest_path}")
                shutil.move(file_path, dest_path)
 
# 路径
path1 = r'D:下载站'       # 对照文件夹(递归扫描)
path2 = r'D:中转站'       # 待处理文件夹(递归扫描)
path3 = r'C:Users86158Desktop重复图片'  # 存放重复文件
 
# 执行
move_duplicates_between_folders(path1, path2, path3)

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容