提交 1b942b41 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

"""
"""
......@@ -455,12 +455,10 @@ def getReportTime():
list_date.append(date)
return list_date
def job(taskType):
# 将上市企业库中的全部A股代码存入list
# 需要提供股票代码、企业信用代码
def job(taskType):
# 将上市企业库中的全部A股代码存入list
# 需要提供股票代码、企业信用代码
while True:
# 从redis中获取企业信用代码
social_code = baseCore.redicPullData('FinanceFromEast:finance_socialCode')
......@@ -505,7 +503,7 @@ def job(taskType):
delist_all = []
info_date_list = []
dic_info = get_info(social_code, com_code, info_date, delist_all, info_date_list, taskType)
# print(dic_info)
print(dic_info)
# 将采集后的报告期存入redis
if len(info_date_list) != 0:
for date in info_date_list:
......@@ -539,9 +537,7 @@ def job(taskType):
cnx.close()
cursor.close()
baseCore.close()
cnx.close()
cursor.close()
baseCore.close()
if __name__=='__main__':
task_type = '财务数据/东方财富网'
......
......@@ -327,7 +327,7 @@ if __name__ == '__main__':
#从redis里拿数据
while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = baseCore.GetToken()
token = '83a9a9be4e9ecf3a8f8a20364227dc5d'
list_weicha = []
list_all_info = []
name_list = []
......
"""
"""
......@@ -23,10 +23,55 @@ log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
# tracker_conf = get_tracker_conf('./client.conf')
# client = Fdfs_client(tracker_conf)
taskType = '企业公告/证监会/福布斯'
taskType = '企业公告/证监会'
def secrchATT(item_id, name, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s '''
cursor_.execute(sel_sql, (item_id, name, type_id))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
selects = secrchATT(item_id, pdf_name, type_id)
if selects:
log.info(f'com_name:{com_name}已存在')
id = selects[0]
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
log.info("更新完成:{}".format(Upsql))
selects = secrchATT(item_id, pdf_name, type_id)
id = selects[0]
return id
def RequestUrl(url, payload, social_code,start_time):
# ip = get_proxy()[random.randint(0, 3)]
......@@ -132,7 +177,7 @@ def getUrl(code, url_parms, Catagory2_parms):
return dic_parms
def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type):
def InsterInto(short_name, social_code, pdf_url):
inster = False
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='证监会' and type='1' '''
......@@ -174,7 +219,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
log.info(f'====pdf解析失败====')
return False
num = num + 1
att_id = baseCore.tableUpdate(retData,com_name,year,pdf_name,num)
att_id = tableUpdate(retData,com_name,year,pdf_name,num)
content = retData['content']
if retData['state']:
pass
......@@ -292,7 +337,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
report_type = td_list[4].text.strip()
# 信息插入数据库
insert = InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type)
insert = InsterInto(short_name, social_code, name_pdf)
if insert:
# # 公告信息列表
......
......@@ -7,6 +7,8 @@ import pymysql
import requests
from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore()
log = baseCore.getLogger()
......@@ -22,9 +24,11 @@ headers = {
'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8'
}
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor()
# cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# cursor= cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '天眼查企业id/天眼查'
#根据信用代码获取天眼查id 企业名字等信息
def getTycIdByXYDM(xydm):
retData={'state':False,'tycData':None,'reput':True}
......@@ -59,39 +63,52 @@ def getTycIdByXYDM(xydm):
# 更新天眼查企业基本信息
def updateTycInfo(id,retData):
state= retData['state']
if state :
data = retData['tycData']
updateSql=f"update tyc_com_info set state=3,update_date=now(),tycid='{data['id']}', type='{data['type']}',comName='{data['comName']}',name='{data['name']}'" \
f",alias='{data['alias']}',logo='{data['logo']}',claimLevel='{data['claimLevel']}',regStatus='{data['regStatus']}' where id={id}"
else:
updateSql = f"update tyc_com_info set state=4,update_date=now() where id={id}"
cursor.execute(updateSql)
cnx.commit()
def updateTycInfo():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
# social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
social_code = '9111000066990444XF'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
start = time.time()
#采集工作
def beginWork():
while True :
start =time.time()
selectSql="select id,xydm from tyc_com_info where state=1 order by update_date asc ,id asc limit 1"
cursor.execute(selectSql)
data = cursor.fetchone()
if data:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
log.info("没有数据了,结束脚本")
break
data_list = list(data)
id = data_list[0]
xydm = data_list[1]
# 数据重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
xydm = data[2]
tycid = data[11]
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm)
updateTycInfo(id,retData)
log.info(f"{id}---{xydm}----处理完成,耗时:{baseCore.getTimeCost(start,time.time())}")
cursor.close()
cnx.close()
# 释放资源
baseCore.close()
if retData['tycData'] and retData['reput']:
tycid = retData['tycData']['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
elif not retData['tycData'] and retData['reput']:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
elif not retData['reput'] and not retData['tycData']:
continue
except Exception as e:
log.error(e)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
if __name__ == '__main__':
beginWork()
\ No newline at end of file
updateTycInfo()
\ No newline at end of file
......@@ -283,7 +283,7 @@ def doJob():
try:
retData = getTycIdByXYDM(xydm)
if retData['tycData'] and retData['reput']:
tycid = retData['id']
tycid = retData['tycData']['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
......
......@@ -57,7 +57,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(60)
time.sleep(20)
s = requests.session()
#获取到token和cookies
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论