-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
99 lines (73 loc) · 2.59 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import pika
import json
import requests
from PyPDF2 import PdfReader
from PIL import Image
from dotenv import load_dotenv
load_dotenv()
HOST = os.getenv("HOST")
SENDER_QUEUE_NAME = os.getenv("SENDER_QUEUE_NAME")
CONSUMER_QUEUE_NAME = os.getenv("CONSUMER_QUEUE_NAME")
print(HOST, SENDER_QUEUE_NAME, CONSUMER_QUEUE_NAME)
mimeTypesExtension = {
"image/png": "png",
"image/jpeg": "jpg",
"image/gif": "gif",
"application/pdf": "pdf"
}
connection = pika.BlockingConnection(pika.ConnectionParameters(HOST))
rmq_channel = connection.channel()
session = requests.Session()
rmq_channel.exchange_declare(exchange=SENDER_QUEUE_NAME, exchange_type='topic', durable=True)
def download_image(session: requests.Session, url: str, path: str) -> bool:
r = session.get(url, stream=True)
if r.status_code == 200:
with open(path, "wb") as f:
f.write(r.content)
return True
return False
def send(message: str) -> None:
print(message)
rmq_channel.basic_publish(exchange=SENDER_QUEUE_NAME, routing_key=SENDER_QUEUE_NAME, body=message)
def on_message(channel, method_frame, header_frame, body) -> None:
rmq_channel.basic_ack(method_frame.delivery_tag)
req = json.loads(body)
upload_id = req["sourceUploadID"]
file_infos = req["fileInfos"]
pages = 0
os.makedirs(f"tmp/{upload_id}", exist_ok=True)
for idx, info in enumerate(file_infos):
url = info["url"]
mime_type = info["fileInfo"]["contentType"]
file = f"tmp/{upload_id}/{idx}.{mimeTypesExtension[mime_type]}"
if not download_image(session, url, file):
return None
if mime_type == "application/pdf":
reader = PdfReader(file)
pages += len(reader.pages)
elif mime_type == "application/gif":
with Image.open('somegif.gif') as im:
pages += im.n_frames
else:
pages += 1
os.remove(file)
os.removedirs(f"tmp/{upload_id}")
res = {
"sourceUploadID": upload_id,
"data": {
"pages": pages,
"type": "SEQUENCE" if pages > 0 else "IMAGE"
}
}
send(json.dumps(res, separators=(',', ':'), ensure_ascii=False))
result = rmq_channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
rmq_channel.queue_bind(exchange=CONSUMER_QUEUE_NAME, queue=queue_name, routing_key=CONSUMER_QUEUE_NAME)
rmq_channel.basic_consume(
queue=queue_name, on_message_callback=on_message, auto_ack=False)
try:
rmq_channel.start_consuming()
except KeyboardInterrupt:
rmq_channel.close()
session.close()