-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.py
executable file
·103 lines (92 loc) · 4.22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
# encoding: utf-8
import wx_send
import requests
import json
import datetime
import Bond
PAGE_URL = "http://data.eastmoney.com/kzz/default.html"
TARGET_URL = "http://datacenter-web.eastmoney.com/api/data/v1/get"
PARAMS = {
'sortColumns': 'PUBLIC_START_DATE',
'sortTypes': -1,
'pageSize': 50,
'pageNumber': 1,
'reportName': "RPT_BOND_CB_LIST",
'quoteColumns': "f2~01~CONVERT_STOCK_CODE~CONVERT_STOCK_PRICE,f235~10~SECURITY_CODE~TRANSFER_PRICE,f236~10~SECURITY_CODE~TRANSFER_VALUE,f2~10~SECURITY_CODE~CURRENT_BOND_PRICE,f237~10~SECURITY_CODE~TRANSFER_PREMIUM_RATIO,f239~10~SECURITY_CODE~RESALE_TRIG_PRICE,f240~10~SECURITY_CODE~REDEEM_TRIG_PRICE,f23~01~CONVERT_STOCK_CODE~PBV_RATIO",
'columns': "ALL",
'source': "WEB",
'client': "WEB"
}
def log(string):
for line in string.splitlines():
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = "[%s] %s" % (now, line)
print(line)
def main():
req = requests.get(TARGET_URL, params=PARAMS)
js = json.loads(req.text)
data = js["result"]["data"]
recently_public_bonds = []
recently_listing_bonds = []
today = datetime.date.today()
dayRel = ["今天", "明天", "后天"]
for item in data:
if item['PUBLIC_START_DATE'] is not None:
start_date = datetime.datetime.strptime(
item['PUBLIC_START_DATE'], '%Y-%m-%d %H:%M:%S').date()
gap = (start_date - today).days
if gap >= 0 and gap < 3:
recently_public_bonds.append(
Bond.Bond(item["SECURITY_NAME_ABBR"], item["SECURITY_CODE"], item["CONVERT_STOCK_PRICE"], item["TRANSFER_PRICE"], item['PUBLIC_START_DATE'], item['LISTING_DATE']))
if item['LISTING_DATE'] is not None:
list_date = datetime.datetime.strptime(
item['LISTING_DATE'], '%Y-%m-%d %H:%M:%S').date()
gap = (list_date - today).days
if gap >= 0 and gap < 2:
recently_listing_bonds.append(
Bond.Bond(item["SECURITY_NAME_ABBR"], item["SECURITY_CODE"], item["CONVERT_STOCK_PRICE"], item["TRANSFER_PRICE"], item['PUBLIC_START_DATE'], item['LISTING_DATE']))
wx_msg = '\n'
wx_msg += '近日发售:\n\n'
if len(recently_public_bonds) > 0:
for bond in recently_public_bonds[::-1]:
if bond.public_start_date is not None:
gap = (bond.public_start_date - today).days
if gap < len(dayRel):
wx_msg += '- %s: 债券代码:%s,债券简称:%s,正股价:%s,转股价:%s, 转股价值:%.2f' % (
dayRel[gap],
bond.code,
bond.name,
bond.price if bond.price is not None else "-",
bond.swap_price if bond.swap_price is not None else "-",
bond.swap_value if bond.swap_price is not None else 0.0)
if bond.swap_price is not None:
if bond.swap_value > 97:
wx_msg += ' ✅'
elif bond.swap_value < 90:
wx_msg += ' ❌'
else:
wx_msg += ' ⚠️'
else:
wx_send.wx_send(
title='每日可转债', content="bond.public_start_date 不应该为空,请检查代码或API")
raise RuntimeError("bond.public_start_date 不应该为空,请检查代码或API")
wx_msg += '\n\n'
else:
wx_msg += '- 无\n\n'
wx_msg += '近日上市:\n\n'
if len(recently_listing_bonds) > 0:
for bond in recently_listing_bonds[::-1]:
gap = (bond.listing_date - today).days
if gap < len(dayRel):
wx_msg += f'- {dayRel[gap]}: 债券代码:{bond.code},债券简称:{bond.name}\n\n'
else:
wx_msg += '- 无\n\n'
if len(recently_public_bonds) > 0 or len(recently_listing_bonds) > 0:
wx_msg += f'⬇️点击下方链接查看一览表⬇️\n{PAGE_URL}'
log(wx_msg)
wx_send.wx_send(title='每日可转债', content=wx_msg)
else:
log("今日无可转债发行或上市")
if __name__ == "__main__":
main()