forked from davy39/qbfrench
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcpasbien.py
154 lines (126 loc) · 5.2 KB
/
cpasbien.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- coding: utf-8 -*-
# VERSION: 2.0
# AUTHOR: Davy39 <[email protected]>, Paolo M
# CONTRIBUTORS: Simon <[email protected]>
# Copyleft
from __future__ import print_function
import urllib
import re
from html.parser import HTMLParser
from helpers import retrieve_url, headers, download_file
from novaprinter import prettyPrinter
import tempfile
import os
import gzip
import io
import json
class cpasbien(object):
# This is a fake url only for engine associations in file download
url = "http://www.cpasbien.fr"
name = "Cpasbien (french)"
supported_categories = {
"all": [""]
}
def __init__(self):
self.real_url = self.find_url()
def find_url(self):
"""Retrieve url from github repository, so it can work even if the url change"""
link_github = "https://raw.githubusercontent.com/menegop/qbfrench/master/urls.json"
try:
req = urllib.request.Request(link_github, headers=headers)
response = urllib.request.urlopen(req)
content = response.read().decode()
urls = json.loads(content)
return urls['cpasbien'][0]
except urllib.error.URLError as errno:
print(" ".join(("Connection error:", str(errno.reason))))
return "http://www.cpasbien.si"
def download_torrent(self, desc_link):
""" Download file at url and write it to a file, return the path to the file and the url """
file, path = tempfile.mkstemp()
file = os.fdopen(file, "wb")
# Download url
req = urllib.request.Request(desc_link, headers=headers)
try:
response = urllib.request.urlopen(req)
except urllib.error.URLError as errno:
print(" ".join(("Connection error:", str(errno.reason))))
return ""
content = response.read().decode()
link = self.real_url + re.findall("<a href='(\/get_torrents\/.*?)'>", content)[0]
print(download_file(link))
class TableRowExtractor(HTMLParser):
def __init__(self, url, results):
self.results = results
self.map_name = {'titre': 'name', 'poid': 'size', 'up': 'seeds', 'down': 'leech'}
self.in_tr = False
self.in_table_corps = False
self.in_div_or_anchor = False
self.current_row = {}
self.url = url
super().__init__()
def handle_starttag(self, tag, attrs):
if tag == 'table':
# check if the table has a class of "table-corps"
attrs = dict(attrs)
if attrs.get('class') == 'table-corps':
self.in_table_corps = True
if self.in_table_corps and tag == 'tr':
self.in_tr = True
if self.in_tr and tag in ['div', 'a']:
# extract the class name of the div element if it exists
self.in_div_or_anchor = True
attrs = dict(attrs)
self.current_div_class = self.map_name.get(attrs.get('class', None), None)
if tag == 'a' and self.current_div_class == 'name':
self.current_row['link'] = self.url + attrs['href']
self.current_row["desc_link"] = self.url + attrs['href']
def handle_endtag(self, tag):
if tag == 'tr':
if self.in_table_corps and 'desc_link' in self.current_row and self.current_row['desc_link'] not in [res['desc_link'] for res in self.results]:
self.results.append(self.current_row)
self.in_tr = False
self.current_row = {}
if tag == 'table':
self.in_table_corps = False
if tag in ['div', 'a']:
self.in_div_or_anchor = False
def handle_data(self, data):
if self.in_div_or_anchor and self.current_div_class:
self.current_row[self.current_div_class] = data
def get_rows(self):
return self.results
def search(self, what, cat="all"):
results = []
len_old_result = 0
for page in range(10):
url = f"{self.real_url}/recherche/{what}/{page * 50 + 1}"
try:
data = retrieve_url(url)
parser = self.TableRowExtractor(self.real_url, results)
parser.feed(data)
results = parser.results
parser.close()
except:
break
if len(results) - len_old_result == 0:
break
len_old_result = len(results)
# Sort results
good_order = [ord_res for _, ord_res in
sorted(zip([[int(res['seeds']), int(res['leech'])] for res in results], range(len(results))))]
results = [results[x] for x in good_order[::-1]]
# Fix size and add engine
for i, res in enumerate(results):
results[i]['size'] = unit_fr2en(res['size'])
results[i]["engine_url"] = self.url
# Print
for res in results:
prettyPrinter(res)
def unit_fr2en(size):
"""Convert french size unit to english"""
return re.sub(
r'([KMGTP])o',
lambda match: match.group(1) + 'B',
size, flags=re.IGNORECASE
)