📅  最后修改于: 2023-12-03 15:40:44.824000             🧑  作者: Mango
流式上传请求是指将文件分块上传,每上传一个块就立即对服务器进行一次请求,直到文件全部上传完成。这种上传方式和传统上传相比,具有更好的灵活性和可靠性。
Python 语言具有丰富的 HTTP 请求库,下面介绍如何使用 requests
库实现流式上传请求。
流式上传请求需要实现以下功能:
为了实现以上功能,需要借助以下包:
requests
:发送 HTTP 请求tqdm
:展示上传进度以下为 stream_upload.py
的代码:
import os
import requests
from tqdm import tqdm
# 定义分块大小,单位为字节
CHUNK_SIZE = 1024 * 1024 * 5
class StreamUpload:
def __init__(self, file_path, url):
self.file_path = file_path
self.url = url
self.file_size = os.path.getsize(file_path)
self.sent_bytes = 0
self.headers = {"Content-Type": "application/octet-stream", "User-Agent": "Mozilla/5.0"}
self.chunk_size = CHUNK_SIZE
self.chunk_num = (self.file_size + self.chunk_size - 1) // self.chunk_size
def upload(self):
with open(self.file_path, "rb") as f:
for i in tqdm(range(self.chunk_num), total=self.chunk_num):
chunk = f.read(self.chunk_size)
self.sent_bytes += len(chunk)
res = requests.post(self.url, data=chunk, headers=self.headers)
if res.status_code != 200:
print(f"Upload failed with status code: {res.status_code}.")
return False
print("Upload finished.")
return True
def resume_upload(self):
self.headers["Content-Range"] = f"bytes {self.sent_bytes}-{self.file_size-1}/{self.file_size}"
with open(self.file_path, "rb") as f:
f.seek(self.sent_bytes)
for i in tqdm(range(self.chunk_num - self.sent_bytes // self.chunk_size), total=self.chunk_num - self.sent_bytes // self.chunk_size):
chunk = f.read(self.chunk_size)
self.sent_bytes += len(chunk)
res = requests.post(self.url, data=chunk, headers=self.headers)
if res.status_code != 200:
print(f"Upload failed with status code: {res.status_code}.")
return False
print("Upload finished.")
return True
if __name__ == '__main__':
# 测试上传文件
file_url = "your_upload_url"
file_path = "your_file_path"
stream_upload = StreamUpload(file_path=file_path, url=file_url)
if os.path.exists("progress.txt"):
with open("progress.txt", "r") as fi:
stream_upload.sent_bytes = int(fi.read())
if stream_upload.resume_upload():
os.remove("progress.txt")
else:
if stream_upload.upload():
print(f"\n{file_path} uploaded successfully.")
```
## 使用方法
1. 安装依赖包:
```shell
pip install requests tqdm
在 stream_upload.py
中找到以下代码块,修改成实际的上传地址和文件路径:
file_url = "your_upload_url"
file_path = "your_file_path"
stream_upload = StreamUpload(file_path=file_path, url=file_url)
运行 stream_upload.py
,上传进度以进度条的形式展示。
python stream_upload.py
如果上传失败,程序会输出上传失败的原因。
如果上传中途出现异常,可以通过以下步骤恢复上传:
progress.txt
的空文件。本文介绍了如何使用 Python 实现流式上传请求,实现了分块上传、进度展示和断点续传等功能。该上传方式在上传大文件时具有优势,对于对上传速度和可靠性要求较高的场景可使用流式上传请求。