forked from xtream1101/humblebundle-downloader
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- caching with json is pretty crazy, so I've switched it to a csv. - added _strtobool - added _strtonone will be useful when converting old json cache - cache object inheriting list. - file operations moved to file_ops.py - readability changes... sorry - changes for consistency across trove and non-trove cache
- Loading branch information
Showing
5 changed files
with
219 additions
and
7 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from typing import Iterator | ||
from base64 import b16encode | ||
|
||
CSV_CACHE: str = "cache.csv" | ||
|
||
|
||
def make_key(order_id: str, filename: str, trove: bool = False) -> str: | ||
return f"{order_id}:{str(b16encode(str.encode(filename)))[2:-1]}:{str(int(trove))}" | ||
|
||
|
||
def _strtobool(val): | ||
"""Convert a string representation of truth to true (1) or false (0). | ||
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values | ||
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if | ||
'val' is anything else. | ||
""" | ||
if None is val: | ||
return False | ||
val = str(val).lower() | ||
if val in ('y', 'yes', 't', 'true', 'on', '1'): | ||
return 1 | ||
elif val in ('n', 'no', 'f', 'false', 'off', '0', 'none'): | ||
return 0 | ||
else: | ||
raise ValueError("invalid truth value %r" % (val,)) | ||
|
||
|
||
def _strtonone(val): | ||
val = str(val).lower() | ||
if val == 'none': | ||
return None | ||
else: | ||
raise ValueError("value was none 'None' %r" % (val,)) | ||
|
||
class CacheDataJson: | ||
key: str | ||
value: dict | ||
|
||
def __init__(self, key: str, value: dict): | ||
self.key = key | ||
self.value = value | ||
|
||
|
||
class CsvCacheData: | ||
|
||
def __init__(self, order_id: str, | ||
filename: str, | ||
md5: str = None, | ||
remote_modified_date: str = None, | ||
local_modified_date: str = None, | ||
trove: bool = False | ||
): | ||
trove = _strtobool(trove) | ||
self.key = make_key(order_id, filename, trove) | ||
self.order_id = order_id | ||
self.filename = filename | ||
self.md5 = md5 | ||
self.remote_modified_date = remote_modified_date | ||
self.local_modified_date = local_modified_date | ||
self.trove = trove | ||
|
||
def set_remote_modified_date(self, remote_modified_date: str): | ||
self.remote_modified_date = remote_modified_date | ||
|
||
def set_local_modified_date(self, local_modified_date: str): | ||
self.local_modified_date = local_modified_date | ||
|
||
def set_md5(self, md5: str): | ||
self.md5 = md5 | ||
|
||
def __str__(self): | ||
return (f"{self.key},{self.order_id},{str(self.trove)},{self.filename},{self.remote_modified_date}," | ||
f"{self.local_modified_date},{self.md5}") | ||
|
||
def __iter__(self) -> Iterator[str]: | ||
return iter([str(self.order_id), self.filename, str(self.md5), str(self.remote_modified_date), | ||
str(self.local_modified_date), str(self.trove)]) | ||
|
||
def __eq__(self, other): | ||
if other is None: | ||
return False | ||
if not hasattr(other, "key"): | ||
return False | ||
return self.key == other.key | ||
|
||
def __contains__(self, item) -> bool: | ||
if any(item is c_attr or item == c_attr for c_attr in self.__dict__.keys()): | ||
return self[item] is not None | ||
return False | ||
|
||
def __getitem__(self, item): | ||
return self.__dict__[item] | ||
|
||
def __setitem__(self, key, value): | ||
self.__dict__[key] = value | ||
|
||
def __mod__(self, other): | ||
""" | ||
override modulus to give us a has compare, should be fun. | ||
:param other: | ||
:return: | ||
""" | ||
if other is None: | ||
return False | ||
if not hasattr(other, "filename"): | ||
return False | ||
return self.filename == other.filename | ||
|
||
|
||
class Cache(list): | ||
def __init__(self, cache_data: list[CsvCacheData]) -> None: | ||
super().__init__(cache_data) | ||
|
||
def __contains__(self, item): | ||
return any(item is c_data or item == c_data for c_data in self) | ||
|
||
def get_cache_item(self, order_id: str, filename: str, trove: bool = False) -> CsvCacheData: | ||
""" | ||
returns a CsvCacheData, returns the one from the cache, if it is in the cahce, otherwise returns new | ||
CsvCacheData. This function is not enough to see if something is in cache, see is_cached(CsvCacheData) | ||
:param order_id: the order id for the cache item from HumbleBundle | ||
:param filename: the filename for the cache_item from HumbleBundle | ||
:param trove: if in humble trove or not (idk what this is tbh) | ||
:return: CsvCacheData | ||
""" | ||
search = CsvCacheData(order_id, filename, trove=trove) | ||
for cache_data in self: | ||
if search == cache_data: | ||
return cache_data | ||
return search |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import os, csv | ||
import multiprocessing | ||
import logging | ||
# internal import below. | ||
from data.cache import CSV_CACHE, CsvCacheData, Cache | ||
logger = logging.getLogger(__name__) | ||
|
||
_HUMBLE_ENV_VAR = "HUMBLE_LIBRARY_PATH" | ||
def rename_old_file(local_filepath, append_str): | ||
# Check if older file exists, if so rename | ||
if os.path.isfile(local_filepath) is True: | ||
filename_parts = local_filepath.rsplit('.', 1) | ||
new_name = "{name}_{append_str}.{ext}" \ | ||
.format(name=filename_parts[0], | ||
append_str=append_str, | ||
ext=filename_parts[1]) | ||
os.rename(local_filepath, new_name) | ||
logger.info("Renamed older file to {new_name}".format(new_name=new_name)) | ||
|
||
|
||
def download_file(product_r, local_filename, progress_bar=False) -> None: | ||
logger.info(f"Downloading: {os.path.basename(local_filename)} ") | ||
|
||
with open(local_filename, 'wb') as outfile: | ||
total_length = product_r.headers.get('content-length') | ||
if total_length is None: # no content length header | ||
outfile.write(product_r.content) | ||
else: | ||
dl = 0 | ||
total_length = int(total_length) | ||
for data in product_r.iter_content(chunk_size=4096): | ||
dl += len(data) | ||
outfile.write(data) | ||
pb_width = 50 | ||
done = int(pb_width * dl / total_length) | ||
if progress_bar: | ||
print("\t{percent}% [{filler}{space}]" # this is nice. | ||
.format(percent=int(done * (100 / pb_width)), | ||
filler='=' * done, | ||
space=' ' * (pb_width - done), | ||
), end='\r') | ||
if dl != total_length: | ||
raise ValueError("Download did not complete") | ||
|
||
|
||
def update_csv_cache(queue: multiprocessing.JoinableQueue): | ||
""" | ||
use csv because json as on-disk data is wild. | ||
:param queue: the queue containing cache data | ||
""" | ||
csv_filepath = os.path.join(get_library_path(), CSV_CACHE) | ||
with open(csv_filepath, 'a+') as outfile: | ||
while 1: | ||
try: | ||
cache_data: CsvCacheData = queue.get(True, 15.0) | ||
except: | ||
pass | ||
if "kill" == cache_data.key: | ||
queue.task_done() | ||
break | ||
|
||
csv.writer(outfile, delimiter=',', quotechar='"').writerow(cache_data) | ||
outfile.flush() | ||
queue.task_done() # need 1 per queue.get | ||
|
||
|
||
def load_cache_csv() -> Cache: | ||
try: | ||
csv_filepath = os.path.join(get_library_path(), CSV_CACHE) | ||
with open(csv_filepath, 'r') as cache_in: | ||
csv_stream = csv.reader(cache_in) | ||
cache_out = Cache([CsvCacheData(*row) for row in csv_stream]) | ||
except FileNotFoundError: | ||
cache_out = Cache([]) | ||
return cache_out | ||
|
||
|
||
def create_product_folder(bundle_title: str, product_title: str) -> str: | ||
product_folder = os.path.join(get_library_path(), bundle_title, product_title) | ||
os.makedirs(product_folder, exist_ok=True) | ||
return product_folder | ||
|
||
|
||
def set_library_path(library_path: str): | ||
os.environ[_HUMBLE_ENV_VAR] = library_path | ||
|
||
|
||
def get_library_path(): | ||
return os.environ[_HUMBLE_ENV_VAR] |