Skip to content

Commit

Permalink
Removed an unstable feature and worked on compatibility.
Browse files Browse the repository at this point in the history
1. Removed the clean cover feature as unable to delete the orphan cover accurately.
2. Made the package for Mac compatible with Mac's m1 and m2 chips.
  • Loading branch information
bookfere committed Dec 7, 2022
1 parent 295b0b1 commit 4dba008
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 166 deletions.
170 changes: 76 additions & 94 deletions FixCover.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import re
import sys
import time
import glob
import imghdr
import string
import tempfile
import sqlite3
from pathlib import Path

Expand All @@ -19,10 +14,9 @@ class FixCover:
name = 'Fix Kindle Ebook Cover'
version = '1.2'
feedback = 'https://bookfere.com/post/994.html'
description = '%s - v%s\nA tool to fix damaged Kindle ebook covers.\n \
description = '%s - v%s\nA tool to fix damaged Kindle ebook covers.\n\
Feedback: %s' % (name, version, feedback)


def __init__(self, logger=None, progress=None, db=None):
self.logger = logger
self.progress = progress
Expand All @@ -43,24 +37,20 @@ def __init__(self, logger=None, progress=None, db=None):
self.log('Time: %s' % time.strftime('%Y-%m-%d %H:%M:%S'))
self.log(self.description, True)


def log(self, text, sep=False):
if self.logger is not None:
divider = '-------------------------------------------'
text = '%s\n%s\n%s' % (divider, text, divider) \
text = '%s\n%s\n%s' % (divider, text, divider) \
if sep is True else text
self.logger(text)


def print_progress(self, factor):
if self.progress is not None:
self.progress(factor)


def get_filepath_list(self, path):
return glob.glob('%s%s**' % (path, os.sep), recursive=True)


def get_ebook_thumbnails_via_path(self, path):
thumbnails = dict()
for thumbnail in self.get_filepath_list(path):
Expand All @@ -72,7 +62,6 @@ def get_ebook_thumbnails_via_path(self, path):
thumbnails[asin.group(1)] = thumbnail
return thumbnails


def get_ebook_thumbnails_via_db(self):
# self.db_cursor.row_factory = lambda cursor, row: row[0]
thumbnails = self.db_cursor.execute(' \
Expand All @@ -81,14 +70,12 @@ def get_ebook_thumbnails_via_db(self):
AND p_location IS NOT NULL')
return [row[0] for row in thumbnails.fetchall()]


def is_damaged_thumbnail(self, path):
try:
return os.path.getsize(path) < 2000
except:
except Exception:
return False


def get_damaged_thumbnails(self, path):
thumbnails = self.get_ebook_thumbnails_via_path(path)
for thumbnail in thumbnails.copy():
Expand All @@ -97,14 +84,12 @@ def get_damaged_thumbnails(self, path):
del thumbnails[thumbnail]
return thumbnails


def is_valid_ebook_file(self, filename):
for ext in ['.mobi', '.azw', '.azw3', 'azw4']:
if filename.endswith(ext):
return True
return False


def get_ebook_list_via_path(self, path):
ebook_list = []
for filename in self.get_filepath_list(path):
Expand All @@ -114,49 +99,41 @@ def get_ebook_list_via_path(self, path):
ebook_list.append(filename)
return ebook_list


def get_ebook_asisn_from_filename(self, filename):
asin = re.search(
'_([A-Z0-9]{10})\.(?:kfx|azw\d{0,1}|prc|[mp]obi)$',
r'_([\w-]*)\.(?:kfx|azw\d{0,1}|prc|[mp]obi)$',
filename
)
if asin is not None:
self.guessed_asins.append(asin.group(1))


def get_ebook_list_via_db(self):
ebook_list = self.db_cursor.execute(" \
SELECT p_uuid, p_location, p_thumbnail, p_cdeType FROM Entries \
WHERE p_cdeType IN ('PDOC', 'EBOK') \
AND p_location IS NOT NULL")
return ebook_list.fetchall()


def store_ebook_thumbnail(self, path, data):
with open(path, 'wb') as file:
file.write(data)


def get_ebook_metadata(self, path):
ebook_asin = None
ebook_type = None
ebook_cover = None
asin = cdetype = cover = None

try:
mobi_file = MOBIFile(path)
ebook_asin = mobi_file.get_metadata('ASIN')
ebook_type = mobi_file.get_metadata('Document Type')
ebook_cover = mobi_file.get_cover_image()
except:
asin = mobi_file.get_metadata('ASIN')
cdetype = mobi_file.get_metadata('Document Type')
cover = mobi_file.get_cover_image()
except Exception:
pass

return (ebook_asin, ebook_type, ebook_cover)

return (asin, cdetype, cover)

def get_thumbnail_name(self, asin, cdetype):
return 'thumbnail_%s_%s_portrait.jpg' % (asin, cdetype)


def fix_via_db(self, thumbnails_path):
for row in self.get_ebook_list_via_db():
p_uuid, p_location, p_thumbnail, p_cde = row
Expand All @@ -166,75 +143,83 @@ def fix_via_db(self, thumbnails_path):
asin, cde, cover = self.get_ebook_metadata(p_location)

if p_location.endswith('KUAL.kual'):
cover = Path(os.path.join(os.path.dirname(__file__),
'kual.jpg')).read_bytes()
cover = Path(
os.path.join(os.path.dirname(__file__), 'kual.jpg')
).read_bytes()
elif not self.is_valid_ebook_file(p_location):
continue
elif cover is None:
self.failure_jobs['ebook_errors'].append('%s\n └─[%s] %s' %
('No cover was found.', p_cde, Path(p_location).name))
self.failure_jobs['ebook_errors'].append(
'%s\n └─[%s] %s' %
('No cover was found.', p_cde, Path(p_location).name)
)
continue

if p_thumbnail is None and p_cde in ('EBOK', 'PDOC'):
asin = asin if asin is not None else p_uuid
cde = cde if cde is not None else p_cde
thumbnail_path = os.path.join(thumbnails_path,
self.get_thumbnail_name(asin, cde))
thumbnail_path = os.path.join(
thumbnails_path,
self.get_thumbnail_name(asin, cde)
)
self.store_ebook_thumbnail(thumbnail_path, cover)
self.db_cursor.execute('UPDATE Entries SET p_thumbnail = ? \
WHERE p_location = ?', (thumbnail_path, p_location))
self.log('✓ Generated: %s\n └─[%s] %s' %
(Path(thumbnail_path).name, p_cde, Path(p_location).name))
elif p_thumbnail is not None and (not os.path.exists(p_thumbnail)
or self.is_damaged_thumbnail(p_thumbnail)):
self.store_ebook_thumbnail(p_thumbnail, cover)
self.log('✓ Fixed: %s\n └─[%s] %s' %
(Path(p_thumbnail).name, p_cde, Path(p_location).name))

self.log(
'✓ Generated: %s\n └─[%s] %s' %
(Path(thumbnail_path).name, p_cde, Path(p_location).name)
)
elif p_thumbnail is not None and (
not os.path.exists(p_thumbnail)
or self.is_damaged_thumbnail(p_thumbnail)
):
self.store_ebook_thumbnail(p_thumbnail, cover)
self.log(
'✓ Fixed: %s\n └─[%s] %s' %
(Path(p_thumbnail).name, p_cde, Path(p_location).name)
)

def fix_via_path(self, thumbnails, documents_path, thumbnails_path):
ebook_list = self.get_ebook_list_via_path(documents_path)
for ebook in ebook_list:
self.print_progress(len(ebook_list))

ebook_asin, ebook_type, ebook_cover = self.get_ebook_metadata(ebook)
asin, cdetype, cover = self.get_ebook_metadata(ebook)
ebook = Path(ebook)

if ebook_cover is None:
self.failure_jobs['ebook_errors'].append('%s\n └─[%s] %s' %
('No cover was found.', ebook_type, ebook.name))
if cover is None:
self.failure_jobs['ebook_errors'].append(
'%s\n └─[%s] %s' %
('No cover was found.', cdetype, ebook.name)
)
continue

if ebook_type == 'EBOK' and ebook_asin in thumbnails.keys():
thumbnail_path = thumbnails[ebook_asin]
if cdetype == 'EBOK' and asin in thumbnails.keys():
thumbnail_path = thumbnails[asin]
thumbnail_name = Path(thumbnail_path).name
self.store_ebook_thumbnail(thumbnail_path, ebook_cover)
self.log('✓ Fixed: %s\n └─[%s] %s' %
(thumbnail_name, ebook_type, ebook.name))
self.store_ebook_thumbnail(thumbnail_path, cover)
self.log(
'✓ Fixed: %s\n └─[%s] %s' %
(thumbnail_name, cdetype, ebook.name)
)
self.conquest_jobs += 1
del thumbnails[ebook_asin]
elif ebook_type == 'EBOK' and ebook_asin is not None:
thumbnail_name = self.get_thumbnail_name(ebook_asin, ebook_type)
del thumbnails[asin]
elif cdetype == 'EBOK' and asin is not None:
thumbnail_name = self.get_thumbnail_name(asin, cdetype)
thumbnail_path = os.path.join(thumbnails_path, thumbnail_name)
if not os.path.exists(thumbnail_path):
self.store_ebook_thumbnail(thumbnail_path, ebook_cover)
self.log('✓ Generated: %s\n └─[%s] %s' %
(thumbnail_name, ebook_type, ebook.name))
self.store_ebook_thumbnail(thumbnail_path, cover)
self.log(
'✓ Generated: %s\n └─[%s] %s' %
(thumbnail_name, cdetype, ebook.name)
)
self.conquest_jobs += 1
# [BUG] Do this will make Kindle can not open ebook.
# elif ebook_type == 'PDOC' and ebook.suffix == '.azw3':
# target = ebook.with_suffix('.mobi')
# ebook.rename(target)
# self.log(
# '✓ Rename %s -> %s to show cover.\n └─[%s] %s' %
# (ebook.suffix, target.suffix, ebook_type, target.name)
# )

self.failure_jobs['cover_errors'] = [Path(thumbnail).name for
thumbnail in thumbnails.values()]

self.print_progress(0)
self.failure_jobs['cover_errors'] = [
Path(thumbnail).name for thumbnail in thumbnails.values()
]

self.print_progress(0)

def fix_ebook_thumbnails(self, documents_path, thumbnails_path):
self.log('Checking damaged ebook covers:', True)
Expand Down Expand Up @@ -281,7 +266,6 @@ def fix_ebook_thumbnails(self, documents_path, thumbnails_path):
else:
self.log('- No ebook cover need to fix.')


def clean_orphan_thumbnails(self, documents_path, thumbnails_path):
self.log('Analysing orphan ebook covers:', True)

Expand All @@ -291,18 +275,21 @@ def clean_orphan_thumbnails(self, documents_path, thumbnails_path):
thumbnails = set(thumbnails.values()) \
- set(self.get_ebook_thumbnails_via_db())
else:
ebook_list = self.get_ebook_list_via_path(documents_path)
for ebook in ebook_list:
self.print_progress(len(ebook_list))
ebook_asin, ebook_type, ebook_cover = self.get_ebook_metadata(ebook)
if ebook_type == 'EBOK' and ebook_asin in thumbnails.keys():
del thumbnails[ebook_asin]

for asin in self.guessed_asins:
if asin in thumbnails.keys():
del thumbnails[asin]

thumbnails = thumbnails.values()
self.log(
'This feature Removed due to impossible to delete the orphan'
'thumbnails perfectly.'
)
return
# ebook_list = self.get_ebook_list_via_path(documents_path)
# for ebook in ebook_list:
# self.print_progress(len(ebook_list))
# asin, cdetype, cover = self.get_ebook_metadata(ebook)
# if cdetype == 'EBOK' and asin in thumbnails.keys():
# del thumbnails[asin]
# for asin in self.guessed_asins:
# if asin in thumbnails.keys():
# del thumbnails[asin]
# thumbnails = thumbnails.values()

self.print_progress(0)

Expand All @@ -317,21 +304,18 @@ def clean_orphan_thumbnails(self, documents_path, thumbnails_path):

self.log('✓ All orphan ebook covers deleted.')


def get_kindle_path(self, path):
return (
os.path.join(path, 'documents'),
os.path.join(path, 'system', 'thumbnails')
)


def is_kindle_root(self, path):
for path in self.get_kindle_path(path):
if os.path.exists(path) is False:
return False
return True


def get_kindle_root_automatically(self):
drives = []
roots = []
Expand All @@ -349,7 +333,6 @@ def get_kindle_root_automatically(self):

return roots


# fix|clean
def handle(self, action='fix', roots=[]):
if not sys.version_info >= (3, 5):
Expand All @@ -360,7 +343,8 @@ def handle(self, action='fix', roots=[]):
return

roots = [roots] if type(roots) != list else roots
roots = self.get_kindle_root_automatically() if len(roots) < 1 else roots
roots = self.get_kindle_root_automatically() \
if len(roots) < 1 else roots

if len(roots) < 1:
self.log('You need choose a Kindle root directory first.')
Expand Down Expand Up @@ -388,8 +372,6 @@ def handle(self, action='fix', roots=[]):
return

self.log('All jobs done.', True)
self.log('\n')


def __del__(self):
if (self.db_access):
Expand Down
24 changes: 16 additions & 8 deletions fix_kindle_ebook_cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=FixCover.description,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('path', metavar='N', nargs='*', default=[],
help='Kindle root directories (optional)')
parser.add_argument('-a', '--action', dest='action',
parser = argparse.ArgumentParser(
description=FixCover.description,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'path', metavar='N', nargs='*', default=[],
help='Kindle root directories (optional)',
)
parser.add_argument(
'-a', '--action', dest='action',
default='fix', choices=['fix', 'clean'],
help='Specify an action to process ebook cover (default: fix)')
parser.add_argument('-d', '--db', dest='database',
default=None, help='Specify a sqlite3 database file.')
help='Specify an action to process ebook cover (default: fix)',
)
parser.add_argument(
'-d', '--db', dest='database',
default=None, help='Specify a sqlite3 database file.',
)

args = parser.parse_args()

Expand Down
Loading

0 comments on commit 4dba008

Please sign in to comment.