Skip to content

Commit

Permalink
Merge pull request #16 from script-money/fix/network
Browse files Browse the repository at this point in the history
✨ support resume from break point when upload images
  • Loading branch information
script-money authored Sep 15, 2022
2 parents d9c2af5 + 9c753aa commit 0dd584a
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 57 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ config-*.py
*.zip
*.backup
*.ipynb
.vscode/
.vscode/
*.json
3 changes: 2 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ class Quality(Enum):


# upload----------------------------------------------------------------------------------------------
UPLOAD_METADATA = False # set False if don't want to upload metadata
UPLOAD_METADATA = True # set False if don't want to upload metadata
PIN_FILES = False # if want to upload permanently, set to True
IPFS_INFO_BACKUP = "./ipfs_info_backup.json" # backup ipfs info
PROXIES: ProxiesTypes = {
"http://": "http://127.0.0.1:7890",
"https://": "http://127.0.0.1:7890",
Expand Down
195 changes: 140 additions & 55 deletions src/upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import asyncio
import os
from typing import Optional, cast
from httpx import AsyncClient, Limits, ReadTimeout, Client, ConnectError, Response
from typing import Optional, TypedDict, cast
from httpx import (
AsyncClient,
Limits,
Client,
Response,
)
import json
import pandas as pd
import random
Expand All @@ -15,26 +20,36 @@
PROXIES,
UPLOAD_METADATA,
PIN_FILES,
IPFS_INFO_BACKUP,
)

from final_check import RENAME_DF, START_ID


IPFSInfo = TypedDict("IPFSInfo", {"Name": str, "Hash": str, "Size": str})


class MaxRetryReachException(Exception):
pass


async def upload_task(
files_path_chunk: list[str], wait_seconds: int
files_path_chunk: list[str], wait_seconds: float
) -> Optional[list[dict]]:
"""
upload task for asyncio, a task process 10 files
Args:
files_path_chunk (list[str]): a list contain 10 files path
wait_seconds (int): because infura limit, 1 second can post 10 times, add wait_seconds to wait
wait_seconds (float): because infura limit, 1 second can post 10 times, add wait_seconds to wait
Returns:
list[dict]: 10 files ipfs info
Optional[list[dict]]: 10 files ipfs info
"""
await asyncio.sleep(wait_seconds)

async with AsyncClient(
proxies=PROXIES, limits=Limits(max_connections=10), timeout=60
proxies=PROXIES, limits=Limits(max_keepalive_connections=None), timeout=60
) as client:
loop = asyncio.get_running_loop()
tasks = [
Expand All @@ -56,11 +71,14 @@ async def upload_single_async(client: AsyncClient, file_path: str) -> Optional[d
file_path (str): path of file want to upload
Returns:
dict: ipfs info json
Optional[dict]: ipfs info json
"""
retry = 0
while retry < 5:
max_retries = 3
while retry <= max_retries:
try:
if retry == max_retries:
raise MaxRetryReachException()
response: Response = await client.post(
f"https://ipfs.infura.io:5001/api/v0/add",
params={
Expand All @@ -72,18 +90,17 @@ async def upload_single_async(client: AsyncClient, file_path: str) -> Optional[d
if response.status_code == 401:
print("Project ID and scecret is invalid")
exit()
if response.status_code == 403:
print(f"403 error: {response.content}")
exit()
res_json: dict = response.json()
if res_json["Name"] != "":
return res_json
except MaxRetryReachException:
input("Max retry reach, seems proxy or network error, Press Ctrl+C stop")
except Exception as e:
if isinstance(e, ReadTimeout):
print(f"upload {file_path.split('-')[0]} timeout, retry {retry}")
elif isinstance(e, ConnectError):
print(f"can't connect to ipfs, please check network or proxy setting")
exit()
else:
print(f"upload {file_path.split('-')[0]} error, exit")
exit()
print(f"{file_path.split('-')[0]} {type(e).__name__} error, retry {retry}")
await asyncio.sleep(retry + 1)
retry += 1
return None

Expand Down Expand Up @@ -141,7 +158,32 @@ def upload_folder(
return (folder_hash, images_dict_list)


def upload_files(folder_name: str, content_type: str = "image/png") -> list[dict]:
def dump_ipfs_info_list_to_local(results: list[IPFSInfo]) -> list[IPFSInfo]:
"""
dump ipfs info list to local IPFS_INFO_BACKUP
Args:
results (list[IPFSInfo]): ipfs info get
Returns:
list[IPFSInfo]: ipfs info list to save
"""
if os.path.exists(IPFS_INFO_BACKUP):
with open(IPFS_INFO_BACKUP, "r") as f:
backup_data: list[IPFSInfo] = json.loads(f.read()) # type: ignore
with open(IPFS_INFO_BACKUP, "w") as g:
backup_data.extend(results)
g.write(json.dumps(backup_data))
print(f"save new {len(backup_data)} ipfs info to local.")
return backup_data
else:
with open(IPFS_INFO_BACKUP, "w") as g:
g.write(json.dumps(results))
print(f"save new {len(results)} ipfs info to local.")
return results


def upload_files(file_paths: list[str]) -> list[IPFSInfo]:
"""
upload files in a folder to ipfs
Expand All @@ -150,46 +192,51 @@ def upload_files(folder_name: str, content_type: str = "image/png") -> list[dict
content_type (str, optional): mime file type. Defaults to "image/png".
Returns:
list[dict]: ipfs info list, example: [{ 'Name': str, 'Hash': str, 'Size': str }]
list[IPFSInfo]: ipfs info list, example: [{ 'Name': str, 'Hash': str, 'Size': str }]
"""
extension = content_type.split("/")[-1]
file_paths = [
os.path.join(folder_name, file_path)
for file_path in list(
filter(lambda i: i.split(".")[-1] == extension, os.listdir(folder_name))
)
]
file_count = len(file_paths)
chunk_size = 10 # 10 per second for infura
chunks = [file_paths[i : i + chunk_size] for i in range(0, file_count, chunk_size)]
tasks = []
results = []
results: list[IPFSInfo] = []

def complete_batch_callback(images_ipfs_data):
results.append(images_ipfs_data.result())
if results[0] == None:
ipfs_result: list[IPFSInfo] = images_ipfs_data.result()
for ipfs_info in ipfs_result:
results.append(ipfs_info)
if ipfs_result[0] == None:
print("No upload info return")
exit()
print(f"complete {len(results)/len(chunks):.2%}")
print(f"complete {len(results)/file_count:.2%}")

loop = asyncio.get_event_loop()
if file_count == 0:
print(f"no any images in folder {IMAGES}")
exit()
print(f"Total {file_count} files to upload, estimate time: {len(chunks)+10}s")
# get average file size in IMAGES folder
file_size = sum([os.path.getsize(i) for i in file_paths]) / file_count
epoch_wait: float = file_size / 100000 # 1 second can upload 10 100kB size files
print(
f"Total {file_count} files to upload, estimate time: {len(chunks)*epoch_wait+10:.1f}s"
) # 200kB per second

for epoch, path_chunk in enumerate(chunks):
task = loop.create_task(upload_task(path_chunk, epoch))
task = loop.create_task(upload_task(path_chunk, epoch * epoch_wait))
tasks.append(task)
task.add_done_callback(complete_batch_callback)

loop.run_until_complete(asyncio.wait(tasks))
print(f"upload {len(results)} files complete.")
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt:
dump_ipfs_info_list_to_local(results)
exit()
print(f"upload new {len(results)} files complete.")
return results


def generate_metadata(
df: pd.DataFrame,
image_ipfs_data: list[dict],
image_ipfs_data: list[IPFSInfo],
start_id: int = 0,
image_folder: str = IMAGES,
metadata_folder: str = METADATA,
Expand All @@ -199,7 +246,7 @@ def generate_metadata(
Args:
df (pd.DataFrame): imagepath and metadata dataframe in final_check.py
image_ipfs_data (dict): image ipfs data from upload_folder
image_ipfs_data (list[IPFSInfo]): image ipfs data from upload_folder
start_id (int, optional): start index. Defaults to 0.
image_folder (str, optional): images folder to use compare. Defaults to IMAGES.
metadata_folder (str, optional): metadata save folder. Defaults to METADATA.
Expand Down Expand Up @@ -243,50 +290,88 @@ def generate_metadata(
return (start_id, start_id + len(df) - 1)


def read_images_from_local() -> list[dict]:
def read_images_from_local() -> list[IPFSInfo]:
"""
read images from local pickle
Returns:
list[dict]: images ipfs info
"""
with open("image_ipfs_data.backup", "r") as f:
result: list[dict] = json.loads(f.read())
with open(IPFS_INFO_BACKUP, "r") as f:
result: list[IPFSInfo] = json.load(f)
print(f"read {len(result)} ipfs data from local")
return result


def download_and_save():
def upload_all_in_image_folder(
folder_name: str = IMAGES, content_type: str = "image/png"
) -> list[IPFSInfo]:
"""
upload images and get ipfs info
upload all files in the folder
Args:
folder_name (str, optional): folder name. Defaults to IMAGES.
content_type (str, optional): content_type header, support png, jpeg, json. Defaults to "image/png".
Returns:
list[dict]: images ipfs info
list[IPFSInfo]: file upload ipfs info
"""
all_ipfs_info_batch = upload_files(IMAGES)
image_ipfs_data = []
for batch_info in all_ipfs_info_batch:
for single_info in batch_info:
image_ipfs_data.append(single_info)
with open("image_ipfs_data.backup", "w") as f:
f.write(json.dumps(image_ipfs_data))
print("save image_ipfs_data to image_ipfs_data.backup")
return image_ipfs_data
extension = content_type.split("/")[-1]
file_paths = [
os.path.join(folder_name, file_path)
for file_path in list(
filter(lambda i: i.split(".")[-1] == extension, os.listdir(folder_name))
)
]
new_file_info = upload_files(file_paths)
# save new ipfs info to local
dump_ipfs_info_list_to_local(new_file_info)
return new_file_info


if __name__ == "__main__":
if not PIN_FILES:
print(
f"Pin file is {PIN_FILES}, set PIN_FILES=True in config.py if want to pin files"
)
if os.path.exists("image_ipfs_data.backup"):
use_local = input("image_ipfs_data.backup exist, load from local? (y/n)")
if os.path.exists(IPFS_INFO_BACKUP):
use_local: str = input(f"{IPFS_INFO_BACKUP} exist, load from local? (y/n)")
if use_local == "y":
image_ipfs_data = read_images_from_local()
image_ipfs_data: list[IPFSInfo] = read_images_from_local()
if image_ipfs_data == None:
upload_all_in_image_folder()
else:
# get file names in IMAGES folder
image_names: list[str] = [
file_name
for file_name in os.listdir(IMAGES)
if file_name != "attr.csv"
]
# filter image_names not in image_ipfs_data's Name
images_not_upload: list[str] = list(
filter(
lambda i: i
not in [ipfs_info["Name"] for ipfs_info in image_ipfs_data], # type: ignore
image_names,
)
)
if len(images_not_upload) != 0:
confirm_upload: str = input(
f"found {len(images_not_upload)} files are not uploaded, press 'y' to upload (y/n)"
)
if confirm_upload == "y" or confirm_upload == "Y":
images_path_not_upload = list(
map(lambda i: os.path.join(IMAGES, i), images_not_upload)
)
new_file = upload_files(images_path_not_upload)
image_ipfs_data = dump_ipfs_info_list_to_local(new_file)

else:
exit()
else:
image_ipfs_data = download_and_save()
image_ipfs_data: list[IPFSInfo] = upload_all_in_image_folder()
else:
image_ipfs_data = download_and_save()
image_ipfs_data: list[IPFSInfo] = upload_all_in_image_folder()

df = RENAME_DF
start, end = generate_metadata(df, image_ipfs_data, START_ID)
Expand Down

0 comments on commit 0dd584a

Please sign in to comment.