-
Notifications
You must be signed in to change notification settings - Fork 379
/
Copy pathapi.py
71 lines (59 loc) · 2.91 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import streamlink
from streamlink import Streamlink
from streamlink.exceptions import NoPluginError, PluginError
from streamlink.stream import DASHStream
import urllib.request
from urllib.error import URLError
from functools import lru_cache
@lru_cache(maxsize=32)
def get_streams(query, quality="best", proxy=None, proxy_type=None, low_latency=True):
try:
# Create a new Streamlink session
session = Streamlink()
# Configure proxy if provided
if proxy:
if proxy_type not in ('http', 'https', 'socks4', 'socks5'):
return "Invalid proxy type. Supported types are: http, https, socks4, socks5"
session.set_option("http-proxy", f"{proxy_type}://{proxy}")
# Configure urllib to use the same proxy
proxy_handler = urllib.request.ProxyHandler({proxy_type: proxy})
opener = urllib.request.build_opener(proxy_handler)
urllib.request.install_opener(opener)
# Set low latency options
if low_latency:
session.set_option("low-latency", True)
session.set_option("hls-live-edge", 1)
session.set_option("hls-segment-stream-data", True)
session.set_option("stream-segment-threads", 2)
session.set_option("ringbuffer-size", 1024)
# Platform-specific optimizations
if 'twitch.tv' in query:
session.set_option("twitch-low-latency", True)
session.set_option("twitch-disable-ads", True)
elif 'youtube.com' in query or 'youtu.be' in query:
session.set_option("youtube-live-edge", 1)
streams = session.streams(query)
if not streams:
return "No streams found."
for stream_quality, link in streams.items():
if quality and quality != stream_quality:
continue
url = link.to_url()
if any(domain in query for domain in ('dailymotion.com', 'dai.ly')):
manifest_url = link.to_manifest_url()
if 'https://www.dailymotion.com/cdn/live/video/' in manifest_url:
try:
with urllib.request.urlopen(manifest_url) as response:
data = response.read().decode('utf-8')
for key in ('live-3', 'live-2', 'live-1'):
if key in data:
return url.replace('live-0', key)
return url
except URLError as e:
return f"Error fetching manifest: {e}"
return manifest_url
if isinstance(link, DASHStream) or any(term in stream_quality for term in ("best", "live", "http")) or "chunklist" in url:
return url
return link.to_manifest_url()
except (ValueError, NoPluginError, PluginError) as ex:
return f"An error occurred: {ex}"