forked from Wikidepia/InstaFix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
instafix.py
462 lines (392 loc) · 15.1 KB
/
instafix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
import asyncio
import json
import os
import random
import re
import time
from typing import Optional
from urllib.parse import urlencode, urljoin
import esprima
import httpx
import pyvips
import sentry_sdk
import tenacity
from diskcache import Cache
from fastapi import FastAPI, Request
from fastapi.responses import (FileResponse, HTMLResponse, ORJSONResponse,
PlainTextResponse, RedirectResponse)
from fastapi.templating import Jinja2Templates
from selectolax.lexbor import LexborHTMLParser
pyvips.cache_set_max(0)
pyvips.cache_set_max_mem(0)
pyvips.cache_set_max_files(0)
os.makedirs("static", exist_ok=True)
NoneType = type(None)
if "SENTRY_DSN" in os.environ:
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
sample_rate=0.5,
)
print("Sentry initialized.")
if "EMBED_PROXY" in os.environ:
print("Using proxy:", os.environ["EMBED_PROXY"])
if "GRAPHQL_PROXY" in os.environ:
print("Using GraphQL proxy:", os.environ["GRAPHQL_PROXY"])
if "WORKER_PROXY" in os.environ:
print("Using worker proxy:", os.environ["WORKER_PROXY"])
app = FastAPI()
templates = Jinja2Templates(directory="templates")
headers = {
"authority": "www.instagram.com",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"accept-language": "en-US,en;q=0.9",
"cache-control": "max-age=0",
"sec-fetch-mode": "navigate",
"upgrade-insecure-requests": "1",
"referer": "https://www.instagram.com/",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36",
"viewport-width": "1280",
}
async def get_data(post_id: str) -> Optional[dict]:
cache = app.state.cache
data = cache.get(post_id)
if not data:
data = await _get_data(post_id)
cache.set(post_id, data, expire=24 * 60 * 60)
data = data.get("data", data)
return data
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_fixed(1))
async def _get_data(post_id: str) -> Optional[dict]:
client = app.state.client
api_resp = (
await client.get(
f"https://www.instagram.com/p/{post_id}/embed/captioned",
)
).text
# additionalDataLoaded
data = re.findall(
r"window\.__additionalDataLoaded\('extra',(.*)\);<\/script>", api_resp
)
if data:
gql_data = json.loads(data[0])
if gql_data and gql_data.get("shortcode_media"):
return gql_data
# TimeSliceImpl
data = re.findall(r'<script>(requireLazy\(\["TimeSliceImpl".*)<\/script>', api_resp)
if data and "shortcode_media" in data[0]:
tokenized = esprima.tokenize(data[0])
for token in tokenized:
if "shortcode_media" in token.value:
# json.loads to unescape the JSON
return json.loads(json.loads(token.value))["gql_data"]
# Get data from HTML
embed_data = parse_embed(api_resp)
if "error" not in embed_data and not embed_data["shortcode_media"]["video_blocked"]:
return embed_data
# Get data from JSON-LD if video is blocked
ld_data = await parse_json_ld(post_id)
if "error" not in ld_data:
return ld_data
# Query data from GraphQL, if video is blocked
if "GRAPHQL_PROXY" in os.environ:
gql_data = await query_gql(post_id)
if gql_data.get("status") == "ok":
return gql_data["data"]
return embed_data
def parse_embed(html: str) -> dict:
tree = LexborHTMLParser(html)
typename = "GraphImage"
display_url = tree.css_first(".EmbeddedMediaImage")
if not display_url:
typename = "GraphVideo"
display_url = tree.css_first("video")
if not display_url:
return {"error": "Not found"}
display_url = display_url.attrs["src"]
username = tree.css_first(".UsernameText").text()
# Remove div class CaptionComments, CaptionUsername
caption_comments = tree.css_first(".CaptionComments")
if caption_comments:
caption_comments.remove()
caption_username = tree.css_first(".CaptionUsername")
if caption_username:
caption_username.remove()
caption_text = ""
caption = tree.css_first(".Caption")
if caption:
for node in caption.css("br"):
node.replace_with("\n")
caption_text = caption.text().strip()
return {
"shortcode_media": {
"owner": {"username": username},
"node": {"__typename": typename, "display_url": display_url},
"edge_media_to_caption": {"edges": [{"node": {"text": caption_text}}]},
"dimensions": {"height": None, "width": None},
"video_blocked": "WatchOnInstagram" in html,
}
}
async def parse_json_ld(post_id: str) -> dict:
client = app.state.client
resp = await client.get(f"https://www.instagram.com/p/{post_id}/")
if resp.status_code != 200:
return {"error": "Not found"}
tree = LexborHTMLParser(resp.text)
json_ld = tree.css_first("script[type='application/ld+json']")
if not json_ld:
return {"error": "Server is blocked from Instagram"}
json_ld = json.loads(json_ld.text())
if isinstance(json_ld, list):
json_ld = json_ld[0]
# Weird bug, need to use isinstance :shrug:
username = "unknown"
if isinstance(json_ld["author"], NoneType):
username = json_ld["author"].get("identifier", {}).get("value", "unknown")
caption = json_ld.get("articleBody", "")
ld_data = {
"shortcode_media": {
"owner": {"username": username},
"edge_media_to_caption": {"edges": [{"node": {"text": caption}}]},
}
}
media_edges = []
video_data = json_ld.get("video", [])
for video in video_data:
media_edges.append(
{
"node": {
"__typename": "GraphVideo",
"display_url": video["contentUrl"],
"dimensions": {
"height": video.get("height"),
"width": video.get("width"),
},
}
}
)
image_data = json_ld.get("image", [])
for image in image_data:
media_edges.append(
{
"node": {
"__typename": "GraphImage",
"display_url": image["url"],
"dimensions": {
"height": image.get("height"),
"width": image.get("width"),
},
}
}
)
ld_data["shortcode_media"]["edge_sidecar_to_children"] = {"edges": media_edges}
return ld_data
async def query_gql(post_id: str) -> dict:
client = app.state.gql_client
params = {
"query_hash": "b3055c01b4b222b8a47dc12b090e4e64",
"variables": json.dumps({"shortcode": post_id}),
}
try:
response = await client.get(
"https://www.instagram.com/graphql/query/", params=params
)
return response.json()
except httpx.ReadTimeout:
return {"status": "fail"}
def mediaid_to_code(media_id: int):
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
short_code = ""
while media_id > 0:
media_id, remainder = divmod(media_id, 64)
short_code = alphabet[remainder] + short_code
return short_code
@app.on_event("startup")
async def startup():
app.state.cache = Cache(
"cache", size_limit=int(5e9), eviction_policy="least-recently-used"
) # Limit cache to 5GB
limits = httpx.Limits(max_keepalive_connections=None, max_connections=None)
app.state.client = httpx.AsyncClient(
headers=headers,
follow_redirects=True,
timeout=120.0,
limits=limits,
proxies={"all://www.instagram.com": os.environ.get("EMBED_PROXY")},
)
# GraphQL are constantly blocked,
# it needs to use a different proxy (residential preferred)
app.state.gql_client = httpx.AsyncClient(
headers=headers,
follow_redirects=True,
timeout=5.0,
limits=limits,
proxies={"all://www.instagram.com": os.environ.get("GRAPHQL_PROXY")},
)
@app.on_event("shutdown")
async def shutdown():
await app.state.client.aclose()
@app.get("/", response_class=HTMLResponse)
def root():
with open("templates/home.html") as f:
html = f.read()
return HTMLResponse(content=html)
@app.get("/p/{post_id}")
@app.get("/p/{post_id}/{num}")
@app.get("/reel/{post_id}")
@app.get("/reels/{post_id}")
@app.get("/tv/{post_id}")
@app.get("/stories/{username}/{post_id}")
async def read_item(request: Request, post_id: str, num: Optional[int] = None):
if "/stories/" in request.url.path:
if not post_id.isdigit():
return FileResponse("templates/404.html", status_code=404)
post_id = mediaid_to_code(int(post_id))
post_url = urljoin("https://www.instagram.com/", request.url.path)
if not re.search(
r"bot|facebook|embed|got|firefox\/92|firefox\/38|curl|wget|go-http|yahoo|generator|whatsapp|preview|link|proxy|vkshare|images|analyzer|index|crawl|spider|python|cfnetwork|node",
request.headers.get("User-Agent", "").lower(),
):
return RedirectResponse(post_url)
data = await get_data(post_id)
if "error" in data:
ctx = {
"request": request,
"title": "InstaFix",
"url": post_url,
"description": "Sorry, this post isn't available.",
}
return templates.TemplateResponse("base.html", ctx, status_code=404)
item = data["shortcode_media"]
if "edge_sidecar_to_children" in item:
media_lst = item["edge_sidecar_to_children"]["edges"]
if isinstance(num, int) and num > len(media_lst):
return FileResponse("templates/404.html", status_code=404)
media = (media_lst[num - 1 if num else 0])["node"]
else:
media = item
typename = media.get("node", media)["__typename"]
description = item["edge_media_to_caption"]["edges"] or [{"node": {"text": ""}}]
description = description[0]["node"]["text"]
description = description[:200] + "..." if len(description) > 200 else description
ctx = {
"request": request,
"url": post_url,
"description": description,
"post_id": post_id,
"title": f"@{item['owner']['username']}",
"width": media["dimensions"]["width"],
"height": media["dimensions"]["height"],
}
is_image = typename in ["GraphImage", "StoryImage", "StoryVideo"]
if num is None and is_image:
ctx["image"] = f"/grid/{post_id}"
ctx["card"] = "summary_large_image"
elif typename == "GraphVideo":
num = num if num else 1
ctx["video"] = f"/videos/{post_id}/{num}"
ctx["card"] = "player"
elif is_image:
num = num if num else 1
ctx["image"] = f"/images/{post_id}/{num}"
ctx["card"] = "summary_large_image"
return templates.TemplateResponse("base.html", ctx)
@app.get("/videos/{post_id}/{num}")
async def videos(request: Request, post_id: str, num: int):
data = await get_data(post_id)
if "error" in data:
return FileResponse("templates/404.html", status_code=404)
item = data["shortcode_media"]
if "edge_sidecar_to_children" in item:
media_lst = item["edge_sidecar_to_children"]["edges"]
if num > len(media_lst):
return FileResponse("templates/404.html", status_code=404)
media = media_lst[num - 1] if num else media_lst[0]
else:
media = item
media = media.get("node", media)
video_url = media.get("video_url", media["display_url"])
# Proxy video via CF worker because Instagram speed limit
worker_proxy = os.environ.get("WORKER_PROXY")
if worker_proxy:
params = urlencode({"url": video_url, "referer": "https://instagram.com/"})
wproxy = random.choice(worker_proxy.split(","))
video_url = f"{wproxy}?{params}"
return RedirectResponse(video_url)
@app.get("/images/{post_id}/{num}")
async def images(request: Request, post_id: str, num: int):
data = await get_data(post_id)
if "error" in data:
return FileResponse("templates/404.html", status_code=404)
item = data["shortcode_media"]
if "edge_sidecar_to_children" in item:
media_lst = item["edge_sidecar_to_children"]["edges"]
if num > len(media_lst):
return FileResponse("templates/404.html", status_code=404)
media = media_lst[num - 1] if num else media_lst[0]
else:
media = item
media = media.get("node", media)
image_url = media["display_url"]
return RedirectResponse(image_url)
@app.get("/oembed.json")
async def oembed(request: Request, post_id: str):
data = await get_data(post_id)
if "error" in data:
return FileResponse("templates/404.html", status_code=404)
item = data["shortcode_media"]
description = item["edge_media_to_caption"]["edges"] or [{"node": {"text": ""}}]
description = description[0]["node"]["text"]
description = description[:200] + "..." if len(description) > 200 else description
return ORJSONResponse(
{
"author_name": description,
"author_url": f"https://instagram.com/p/{post_id}",
"provider_name": "InstaFix - Embed Instagram videos and images",
"provider_url": "https://github.com/Wikidepia/InstaFix",
"title": "Instagram",
"type": "link",
"version": "1.0",
}
)
@app.get("/grid/{post_id}")
async def grid(request: Request, post_id: str):
client = request.app.state.client
if os.path.exists(f"static/grid:{post_id}.webp"):
return FileResponse(
f"static/grid:{post_id}.webp",
media_type="image/webp",
)
async def download_image(url):
return (await client.get(url)).content
data = await get_data(post_id)
if "error" in data:
return FileResponse("templates/404.html", status_code=404)
item = data["shortcode_media"]
if "edge_sidecar_to_children" in item:
media_lst = item["edge_sidecar_to_children"]["edges"]
else:
media_lst = [item]
is_image = lambda x: x in ["GraphImage", "StoryImage", "StoryVideo"]
# Limit to 4 images, Discord only show 4 images originally
media_urls = [
m.get("node", m)["display_url"]
for m in media_lst
if is_image(m.get("node", m)["__typename"])
][:4]
# Download images and merge them into a single image
media_imgs = await asyncio.gather(*[download_image(url) for url in media_urls])
if media_imgs == []:
return FileResponse("templates/404.html", status_code=404)
media_vips = [
pyvips.Image.new_from_buffer(img, "", access="sequential") for img in media_imgs
]
accross = min(len(media_imgs), 2)
grid_img = pyvips.Image.arrayjoin(media_vips, across=accross, shim=10)
grid_img.write_to_file(f"static/grid:{post_id}.webp")
return FileResponse(
f"static/grid:{post_id}.webp",
media_type="image/webp",
)
@app.get("/health", response_class=PlainTextResponse)
def healthcheck():
return "200"