之前把B站动态当朋友圈发了,搭了wordpress 之后就想把之前的内容搬到这来,主要是做了三件事,第一个是使用B站分页接口下载了全部动态内容这部分是手动去做了总共才8页;第二件事把有用的数据导出成csv文件顺便把图片命名后下载;第三件事是通过wp的文章上传接口和媒体文件上传接口去批量上传动态,最后再把时间修改到对应的时间点。
一、下载动态内容
二、整理数据下载动态图片
# 把bili-orgin 里面的json文件一个一个遍历 并且把每个json文件的item项写出到一个csv文件中
# 要求必须有id_str,topic.name,summary.text,pics 的全部图片url ,
# 并且创建一个文件夹 把所有的图片下载到文件夹中,最好插入到csv文件中
import os
import csv
import requests
import json # 添加这行来导入json模块
from datetime import datetime
def extract_image_urls(json_data):
items = json_data.get('data', {}).get('items', [])
results = []
for item in items:
id_str = item.get('id_str', '')
modules = item.get('modules', {})
author = modules.get('module_author', {})
topic_name = author.get('name', '')
dynamic = modules.get('module_dynamic', {})
major = dynamic.get('major', {}) or {}
opus = major.get('opus', {})
pub_ts = author.get('pub_ts', 0)
pub_time = datetime.fromtimestamp(pub_ts).strftime('%Y-%m-%d %H:%M:%S')
summary = opus.get('summary', {})
summary_text = summary.get('text', '')
# print(author)
print(pub_time)
# 修改图片URL处理
pics = opus.get('pics', [])
image_urls = []
for index, pic in enumerate(pics):
if pic.get('url'):
original_url = pic.get('url')
file_extension = os.path.splitext(original_url)[1]
new_url = f"https://iotie.cn/wp-content/uploads/2024/10/{id_str}_{index}{file_extension}"
image_urls.append(new_url)
download_images(id_str, [original_url], 'all_images') # 修改这里
results.append({
'id_str': id_str,
'topic_name': topic_name,
'summary_text': summary_text,
'image_urls': ','.join(image_urls), # 直接将图片URL连接成字符串
'pub_time': pub_time
})
return results
def process_json_files(directory):
csv_file = 'output---time.csv'
with open(csv_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(['id_str', 'topic_name', 'summary_text', 'image_urls', 'pub_time'])
for filename in os.listdir(directory):
if filename.endswith('.json'):
file_path = os.path.join(directory, filename)
with open(file_path, 'r', encoding='utf-8') as file:
json_data = json.load(file)
results = extract_image_urls(json_data)
for result in results:
writer.writerow([
result['id_str'],
result['topic_name'],
result['summary_text'],
result['image_urls'], # 现在这已经是一个逗号分隔的字符串
result['pub_time']
])
def download_images(id_str, image_urls, output_folder):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
for index, url in enumerate(image_urls):
file_extension = os.path.splitext(url)[1]
file_name = f'{id_str}_{index + 1}{file_extension}' # 修改这里
file_path = os.path.join(output_folder, file_name)
# if not os.path.exists(file_path):
try:
response = requests.get(url)
response.raise_for_status() # 添加这行来检查请求是否成功
with open(file_path, 'wb') as file:
file.write(response.content)
print(f"下载成功: {file_path}")
except requests.exceptions.RequestException as e:
print(f"下载失败: {url}. 错误: {e}")
# else:
# print(f"跳过下载: {file_path} 已存在")
if __name__ == "__main__":
directory = 'bili-orgin'
process_json_files(directory)
三、通过wordpress 接口上传动态
# 使用接口去插入帖子并且插入图片
import csv
import requests
import os
import json
import urllib.parse
# API 接口地址
MEDIA_API = "https://iotie.cn/index.php?rest_route=/wp/v2/media"
POST_API = "https://iotie.cn/index.php?rest_route=/wp/v2/notes/&_locale=user"
# 请求头
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Content-Type": "application/json",
"Cookie": "你的cookie",
# "Host": "124.70.149.61:8089",
# "Origin": "https://iotie.cn",
"Pragma": "no-cache",
# "Referer": "https://iotie.cn/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"X-WP-Nonce": "0c63a7558f"
}
import mimetypes
def get_content_type(filename):
"""根据文件名获取MIME类型"""
mime_type, _ = mimetypes.guess_type(filename)
return mime_type or 'application/octet-stream'
def upload_image(image_path):
"""使用二进制方式上传图片并返回图片ID"""
with open(image_path, 'rb') as img:
file_name = os.path.basename(image_path)
content_type = get_content_type(file_name)
files = {'file': (file_name, img, content_type)}
headers_upload = headers.copy()
headers_upload.pop('Content-Type', None) # 移除Content-Type头部
response = requests.post(MEDIA_API, headers=headers_upload, files=files)
if response.status_code == 201:
print(f"图片上传成功,响应内容: {response.text}")
return response.json()['id']
else:
print(f"上传图片失败: {response.status_code}, {response.text}")
return None
def create_post(content, image_ids):
"""创建帖子"""
post_data = {
"content": f"<p>#🌈日常 {content}</p>",
"status": "private",
"comment_status": "open",
"ping_status": "open",
"topics": ["日常"],
}
if image_ids:
post_data["fields"] = [{"name": "images", "value": ",".join(map(str, image_ids))}]
response = requests.post(POST_API, headers=headers, json=post_data)
if response.status_code == 201:
print(f"帖子创建成功,响应内容: {response.text}")
else:
print(f"创建帖子失败: {response.status_code}, {response.text}")
def extract_filename_from_url(url):
"""从URL中提取文件名"""
parsed_url = urllib.parse.urlparse(url)
return os.path.basename(parsed_url.path)
def main():
with open('output-new.csv', 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
print("CSV列名:", reader.fieldnames)
# 跳过第一行数据
next(reader, None)
# 处理剩下的所有行
for row in reader:
print("处理行:", row)
content = row.get('summary_text', '')
id_str = row.get('id_str', '')
image_urls = row.get('image_urls', '').split(',')
if not content or not id_str:
print("错误: 无法找到必要的列 'summary_text' 或 'id_str'")
continue
image_ids = []
for image_url in image_urls:
if not image_url.strip():
continue
image_filename = extract_filename_from_url(image_url.strip())
image_path = os.path.join('downloaded_images', image_filename)
if os.path.exists(image_path):
print(f"处理图片: {image_path}")
image_id = upload_image(image_path)
if image_id:
image_ids.append(image_id)
else:
print(f"图片不存在: {image_path}")
if image_ids:
create_post(content, image_ids)
else:
print("没有找到或上传成功的图片,创建纯文本帖子")
create_post(content, [])
print("----------------------------") # 添加分隔线,使输出更清晰
if __name__ == "__main__":
main()
同步日期
import csv
import mysql.connector
from datetime import datetime
# 数据库连接配置
db_config = {
'host': '',
'user': '',
'password': '',
'database': ''
}
# 连接到数据库
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
# 读取CSV文件
with open('output-new.csv', 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
summary_text = row['summary_text']
date_str = row['pub_time']
# 将日期字符串转换为datetime对象
date_obj = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
# 格式化日期为MySQL datetime格式
mysql_date = date_obj.strftime('%Y-%m-%d %H:%M:%S')
# 更新数据库
update_query = """
UPDATE wp_posts
SET post_date = %s,
post_date_gmt = %s,
post_modified = %s,
post_modified_gmt = %s
WHERE post_content LIKE %s
"""
cursor.execute(update_query, (mysql_date, mysql_date, mysql_date, mysql_date, f'%{summary_text}%'))
if cursor.rowcount > 0:
print(f"更新成功: {summary_text}")
else:
print(f"未找到匹配记录: {summary_text}")
# 提交更改并关闭连接
conn.commit()
cursor.close()
conn.close()
print("所有更新已完成")