提交 27673f2e 作者: 薛凌堃

redis

上级 5aa47744
...@@ -7,14 +7,17 @@ from apscheduler.schedulers.blocking import BlockingScheduler ...@@ -7,14 +7,17 @@ from apscheduler.schedulers.blocking import BlockingScheduler
basecore = BaseCore.BaseCore() basecore = BaseCore.BaseCore()
log = basecore.getLogger() log = basecore.getLogger()
#144数据库
cnx = basecore.cnx
cursor = basecore.cursor
r = basecore.r r = basecore.r
#11数据库 def cnn11():
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') #11数据库
cursor_ = cnx_.cursor() cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx_.cursor()
return cnx_,cursor_
def close11(cnx_,cursor_):
cnx_.close()
cursor_.close()
# # 连接到Redis # # 连接到Redis
# r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) # r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
...@@ -37,7 +40,20 @@ cursor_ = cnx_.cursor() ...@@ -37,7 +40,20 @@ cursor_ = cnx_.cursor()
# return gn_social_list,gw_social_list # return gn_social_list,gw_social_list
#企业动态 #企业动态
#创建数据库连接
def connectSql():
cnx = basecore.cnx
cursor = basecore.cursor
return cnx,cursor
#关闭数据库连接
def closeSql(cnx,cursor):
cnx.close()
cursor.close()
def NewsEnterprise(): def NewsEnterprise():
cnx,cursor = connectSql()
# #获取国内企业 # #获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1'" gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
cursor.execute(gn_query) cursor.execute(gn_query)
...@@ -51,26 +67,24 @@ def NewsEnterprise(): ...@@ -51,26 +67,24 @@ def NewsEnterprise():
#todo:打印长度 #todo:打印长度
# print(len(gw_social_list)) # print(len(gw_social_list))
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
#将数据插入到redis中 #将数据插入到redis中
# for item in gn_social_list: for item in gn_social_list:
# r.rpush('NewsEnterprise:gnqy_socialCode', item) r.rpush('NewsEnterprise:gnqy_socialCode', item)
# count = 0
for item in gw_social_list: for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item) r.rpush('NewsEnterprise:gwqy_socialCode', item)
# count+=1 closeSql(cnx,cursor)
# print(item)
# print(count)
#企业动态定时任务 #企业动态定时任务
def NewsEnterprise_task(): def NewsEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每天执行一次 # 每天执行一次
scheduler.add_job(NewsEnterprise, 'cron', hour=12,minute=0,max_instances=2) scheduler.add_job(NewsEnterprise, 'cron', hour=0,minute=0,max_instances=2)
try: try:
# redisPushData # 定时开始前执行一次 # redisPushData # 定时开始前执行一次
NewsEnterprise()
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -78,14 +92,17 @@ def NewsEnterprise_task(): ...@@ -78,14 +92,17 @@ def NewsEnterprise_task():
#企业公告 #企业公告
def NoticeEnterprise(): def NoticeEnterprise():
cnx,cursor = connectSql()
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 10 " gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null "
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('NoticeEnterprise:gnqy_socialCode', item) r.rpush('NoticeEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业公告定时任务 #企业公告定时任务
def NoticeEnterprise_task(): def NoticeEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
...@@ -101,14 +118,16 @@ def NoticeEnterprise_task(): ...@@ -101,14 +118,16 @@ def NoticeEnterprise_task():
#企业年报 #企业年报
def AnnualEnterprise(): def AnnualEnterprise():
cnx,cursor = connectSql()
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 10" gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item) r.rpush('AnnualEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业年报定时任务 #企业年报定时任务
def AnnualEnterprise_task(): def AnnualEnterprise_task():
...@@ -117,7 +136,8 @@ def AnnualEnterprise_task(): ...@@ -117,7 +136,8 @@ def AnnualEnterprise_task():
# 每年执行一次 # 每年执行一次
scheduler.add_job(AnnualEnterprise, 'cron', second='*/10') scheduler.add_job(AnnualEnterprise, 'cron', second='*/10')
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
AnnualEnterprise()
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -125,23 +145,26 @@ def AnnualEnterprise_task(): ...@@ -125,23 +145,26 @@ def AnnualEnterprise_task():
#企业基本信息 #企业基本信息
def BaseInfoEnterprise(): def BaseInfoEnterprise():
cnx,cursor = connectSql()
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 10 " gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('BaseInfoEnterprise:gnqy_socialCode', item) r.rpush('BaseInfoEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业基本信息定时任务 #企业基本信息定时任务
def BaseInfoEnterprise_task(): def BaseInfoEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每年执行一次 # 每年执行一次
scheduler.add_job(BaseInfoEnterprise, 'cron', second='*/10') scheduler.add_job(BaseInfoEnterprise, 'cron', month=1, day=1, hour=0, minute=0)
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
BaseInfoEnterprise()
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -149,6 +172,7 @@ def BaseInfoEnterprise_task(): ...@@ -149,6 +172,7 @@ def BaseInfoEnterprise_task():
#东方财富网财务数据 #东方财富网财务数据
def FinanceFromEast(): def FinanceFromEast():
cnx_,cursor_ = cnn11()
#从上市企业库中读取数据 #从上市企业库中读取数据
sql_sel = '''select social_credit_code from sys_base_enterprise_ipo where category = '1' limit 10 ''' sql_sel = '''select social_credit_code from sys_base_enterprise_ipo where category = '1' limit 10 '''
cursor_.execute(sql_sel) cursor_.execute(sql_sel)
...@@ -157,6 +181,7 @@ def FinanceFromEast(): ...@@ -157,6 +181,7 @@ def FinanceFromEast():
print('=======') print('=======')
for item in finance_list: for item in finance_list:
r.rpush('FinanceFromEast:finance_socialCode', item) r.rpush('FinanceFromEast:finance_socialCode', item)
close11(cnx_,cursor_)
def FinanceFromEase_task(): def FinanceFromEase_task():
# 实例化一个调度器 # 实例化一个调度器
...@@ -164,7 +189,7 @@ def FinanceFromEase_task(): ...@@ -164,7 +189,7 @@ def FinanceFromEase_task():
# 每个季度执行一次 # 每个季度执行一次
scheduler.add_job(FinanceFromEast, 'cron', month='1-12/3', day='1',hour=0, minute=0) scheduler.add_job(FinanceFromEast, 'cron', month='1-12/3', day='1',hour=0, minute=0)
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -172,29 +197,33 @@ def FinanceFromEase_task(): ...@@ -172,29 +197,33 @@ def FinanceFromEase_task():
#微信公众号 #微信公众号
def WeiXingetFromSql(): def WeiXingetFromSql():
cnx_,cursor_=cnn11()
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql) cursor_.execute(selectSql)
results = cursor.fetchall() results = cursor_.fetchall()
result_list = [item[0] for item in results] result_list = [item[0] for item in results]
#放入redis #放入redis
for item in result_list: for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
close11(cnx_,cursor_)
#微信公众号定时任务 #微信公众号定时任务
def weixin_task(): def weixin_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每天执行一次 # 每天执行一次
scheduler.add_job(WeiXingetFromSql, 'cron', hour=12,minute=0) scheduler.add_job(WeiXingetFromSql, 'cron', hour=0,minute=0)
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
pass pass
# 企业年报——雪球网 # 企业年报——雪球网
def AnnualEnterpriseIPO(): def AnnualEnterpriseXueQ():
cnx,cursor = connectSql()
# 获取国内上市企业 # 获取国内上市企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1 limit 10" gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1 limit 10"
cursor.execute(gn_query) cursor.execute(gn_query)
...@@ -203,8 +232,24 @@ def AnnualEnterpriseIPO(): ...@@ -203,8 +232,24 @@ def AnnualEnterpriseIPO():
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('AnnualEnterprise:gnshqy_socialCode', item) r.rpush('AnnualEnterprise:gnshqy_socialCode', item)
closeSql(cnx,cursor)
#雪球网年报定时任务
def AnnualEnterpriseXueQ_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每年执行一次
scheduler.add_job(AnnualEnterpriseXueQ, 'cron', month=1, day=1, hour=0, minute=0)
try:
# 定时开始前执行一次
AnnualEnterpriseXueQ()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#国外企业基本信息 #国外企业基本信息
def BaseInfoEnterpriseAbroad(): def BaseInfoEnterpriseAbroad():
cnx,cursor = connectSql()
# 获取国外企业 # 获取国外企业
gn_query = "select id from EnterpriseInfo where Place = '2' limit 10 " gn_query = "select id from EnterpriseInfo where Place = '2' limit 10 "
cursor.execute(gn_query) cursor.execute(gn_query)
...@@ -213,9 +258,24 @@ def BaseInfoEnterpriseAbroad(): ...@@ -213,9 +258,24 @@ def BaseInfoEnterpriseAbroad():
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('BaseInfoEnterprise:gwqy_socialCode', item) r.rpush('BaseInfoEnterprise:gwqy_socialCode', item)
closeSql(cnx,cursor)
#国外基本信息定时任务
def BaseInfoAbroad_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个月执行一次
scheduler.add_job(BaseInfoEnterpriseAbroad, 'cron', day=1,hour=0, minute=0)
try:
# redisPushData # 定时开始前执行一次
BaseInfoEnterpriseAbroad()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
##福布斯=====从数据库中读取信息放入redis #福布斯=====从数据库中读取信息放入redis
def FBS(): def FBS():
cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库 # todo:调整为获取福布斯的数据库
# gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2" # gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2"
# cursor.execute(gw_query) # cursor.execute(gw_query)
...@@ -228,14 +288,14 @@ def FBS(): ...@@ -228,14 +288,14 @@ def FBS():
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
# gw_social_list = [item[0] for item in gw_result] # gw_social_list = [item[0] for item in gw_result]
#
# for item in gw_social_list: # for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
for item in gn_social_list: for item in gn_social_list:
if not r.exists(item): if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
closeSql(cnx,cursor)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
...@@ -243,15 +303,13 @@ if __name__ == "__main__": ...@@ -243,15 +303,13 @@ if __name__ == "__main__":
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() NewsEnterprise_task()
# NewsEnterprise() # NewsEnterprise()
# BaseInfoEnterprise() # BaseInfoEnterprise()
# FBS() # FBS()
# NoticeEnterprise_task() NoticeEnterprise_task()
# AnnualEnterprise_task() AnnualEnterprise_task()
# NoticeEnterprise() # NoticeEnterprise()
FinanceFromEast() # FinanceFromEast()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===') log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
cnx_.close()
cursor_.close()
# basecore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论