Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

利用MySQL ON DUPLICATE KEY UPDATE方法解决重复执行一些任务出现IntegrityError错误问题 #139

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions instock/lib/database.py
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试不过。
C:\Python\Python3\python.exe D:\my\src\py\stock\instock\job\selection_data_daily_job.py
ERROR:root:database.insert_other_db_from_df处理异常:cn_stock_selection表Can't reconnect until invalid transaction is rolled back. Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy import create_engine
from sqlalchemy.types import NVARCHAR
from sqlalchemy import inspect
from sqlalchemy.dialects.mysql import insert

__author__ = 'myh '
__date__ = '2023/3/10 '
Expand Down Expand Up @@ -86,16 +87,17 @@ def insert_other_db_from_df(to_db, data, table_name, cols_type, write_index, pri
if write_index:
# 插入到第一个位置:
col_name_list.insert(0, data.index.name)
method=insert_on_duplicate(except_cols_on_update=primary_keys.split(','))
try:
if cols_type is None:
data.to_sql(name=table_name, con=engine_mysql, schema=to_db, if_exists='append',
index=write_index, )
index=write_index, method=method, )
elif not cols_type:
data.to_sql(name=table_name, con=engine_mysql, schema=to_db, if_exists='append',
dtype={col_name: NVARCHAR(255) for col_name in col_name_list}, index=write_index, )
dtype={col_name: NVARCHAR(255) for col_name in col_name_list}, index=write_index, method=method, )
else:
data.to_sql(name=table_name, con=engine_mysql, schema=to_db, if_exists='append',
dtype=cols_type, index=write_index, )
dtype=cols_type, index=write_index, method=method, )
except Exception as e:
logging.error(f"database.insert_other_db_from_df处理异常:{table_name}表{e}")

Expand Down Expand Up @@ -204,3 +206,15 @@ def executeSqlCount(sql, params=()):
except Exception as e:
logging.error(f"database.select_count计算数量处理异常:{e}")
return 0

#MySQL ON DUPLICATE KEY UPDATE 方法更新数据
def insert_on_duplicate(except_cols_on_update=[]):
def method(table, conn, keys, data_iter):
update_keys = [key for key in keys if key not in except_cols_on_update]

insert_stmt = insert(table.table).values([dict(zip(keys, data)) for data in data_iter])
upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted if x.name in update_keys})
result = conn.execute(upsert_stmt)
return result.rowcount

return method