提交 5daf3895 作者: 薛凌堃

8/26

上级 45ef43f4
......@@ -36,6 +36,8 @@ class BaseCore:
# __cursor_proxy = None
cnx = None
cursor = None
cnx_ = None
cursor_ = None
r = None
# agent 池
__USER_AGENT_LIST = [
......@@ -241,6 +243,11 @@ class BaseCore:
charset='utf8mb4')
self.cursor = self.cnx.cursor()
#11数据库
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
self.cursor_ = self.cnx_.cursor()
# 连接到Redis
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
......@@ -584,7 +591,7 @@ class BaseCore:
self.r.set(key, 0)
self.r.expire(key, 3600)
time.sleep(2)
#上传至文件服务器
#上传至文件服务器,并解析pdf的内容和页数
def upLoadToServe(self,pdf_url,type_id,social_code):
headers = {}
retData = {'state':False,'type_id':type_id,'item_id':social_code,'group_name':'group1','path':'','full_path':'',
......@@ -626,8 +633,14 @@ class BaseCore:
return retData
def secrchATT(self,item_id,year,type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
self.cursor_.execute(sel_sql, (item_id, year, type_id))
selects = self.cursor_.fetchone()
return selects
#插入到att表 返回附件id
def tableUpdate(self,retData,com_name,year,order_by):
def tableUpdate(self,retData,com_name,year,pdf_name,num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
......@@ -639,23 +652,31 @@ class BaseCore:
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
self.cursor.execute(sel_sql, (item_id, year,type_id))
selects = self.cursor.fetchone()
order_by = num
selects = self.secrchATT(item_id,year,type_id)
# sel_sql = '''select id,item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
# self.cursor.execute(sel_sql, (item_id, year,type_id))
# selects = self.cursor.fetchone()
if selects:
self.getLogger().info(f'com_name:{com_name}已存在')
id = selects[0]
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, com_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
self.cursor.execute(Upsql, values) # 插入
self.cnx.commit() # 提交
self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交
self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,year,type_id)
id = selects[0]
return id
......
......@@ -301,9 +301,9 @@ def BaseInfoAbroad_task():
def FBS():
cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库
gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
cursor.execute(gw_query)
gw_result = cursor.fetchall()
# gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
# cursor.execute(gw_query)
# gw_result = cursor.fetchall()
#获取国内企业
gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
......@@ -311,17 +311,18 @@ def FBS():
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result]
# gw_social_list = [item[0] for item in gw_result]
for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
# for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
# r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
# for item in gn_social_list:
# if not r.exists(item):
for item in gn_social_list:
if not r.exists(item):
# r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor)
#将IPO的国外股票代码放到redis中
......
"""
"""
......@@ -15,20 +15,18 @@ from bs4 import BeautifulSoup
from kafka import KafkaProducer
from datetime import datetime
from base import BaseCore
from fdfs_client.client import get_tracker_conf, Fdfs_client
# from fdfs_client.client import get_tracker_conf, Fdfs_client
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# cnx_ip = pymysql.connect(host='114.115.159.144',user='root', password='zzsn9988', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
cursor_ = cnx_.cursor()
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
cnx = baseCore.cnx
cursor = baseCore.cursor
taskType = '企业公告/证监会'
# tracker_conf = get_tracker_conf('./client.conf')
# client = Fdfs_client(tracker_conf)
taskType = '企业公告/证监会/福布斯'
def RequestUrl(url, payload, social_code,start_time):
# ip = get_proxy()[random.randint(0, 3)]
......@@ -138,30 +136,25 @@ def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type
inster = False
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s'''
cursor_.execute(sel_sql, (social_code, pdf_url))
selects = cursor_.fetchone()
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
if selects:
print(f'com_name:{short_name}、{pdf_url}已存在')
return inster
# 信息插入数据库
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
list_info = [
social_code,
name_pdf,
'', # 摘要
'', # 正文
pub_time, # 发布时间
pdf_url,
'证监会',
report_type,
'1',
'zh'
]
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
#144数据库
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
insert = True
return insert
except:
......@@ -171,34 +164,42 @@ def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type
return insert
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time):
sel_sql = "select article_id from brpa_source_article where source_address = %s"
cursor_.execute(sel_sql, pdf_url)
row = cursor_.fetchone()
id = row[0]
# 先获取PDF链接下载pdf,在解析内容
try:
res = requests.get(pdf_url)
content = ''
# 读取文件内容,
with fitz.open(stream=res.content, filetype='pdf') as doc:
for page in doc.pages():
content += page.get_text()
except:
# print('解析失败')
dic_result = {
'success': 'false',
'message': 'PDF解析失败',
'code': '204',
}
print(dic_result)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, dic_result['message'])
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_name,num):
#上传至文件服务器
retData = baseCore.upLoadToServe(pdf_url,8,social_code)
#附件插入att数据库
num = num + 1
att_id = baseCore.tableUpdate(retData,com_name,year,pdf_name,num)
content = retData['content']
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
# 先获取PDF链接下载pdf,在解析内容
# try:
# res = requests.get(pdf_url)
# content = ''
# # 读取文件内容,解析内容
# with fitz.open(stream=res.content, filetype='pdf') as doc:
# for page in doc.pages():
# content += page.get_text()
# except:
# # print('解析失败')
# dic_result = {
# 'success': 'false',
# 'message': 'PDF解析失败',
# 'code': '204',
# }
# log.info(dic_result)
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, dic_result['message'])
# return False
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': id,
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
......@@ -247,11 +248,12 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time):
# 采集信息
def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获取到的基本信息
def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息
okCount = 0
errorCount = 0
social_code = dic_info[2]
short_name = dic_info[4]
com_name = dic_info[1]
soup = RequestUrl(url, payload, social_code, start_time)
if soup == '':
......@@ -298,9 +300,9 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
pdf_url_info = td_list[2]
# print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'')
year = pub_time[:4]
report_type = td_list[4].text.strip()
......@@ -311,7 +313,7 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
# # 公告信息列表
# okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time)
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
if result:
# 公告信息列表
......@@ -335,6 +337,7 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
if __name__ == '__main__':
num = 0
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
......@@ -370,7 +373,8 @@ if __name__ == '__main__':
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterprise:gnqy_socialCode')
# social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode')
social_code = '9110000071092841XX'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......@@ -391,15 +395,16 @@ if __name__ == '__main__':
# 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,4、8开头的为北京交易所
code = dic_info[3]
short_name = dic_info[4]
com_name = dic_info[1]
dic_parms = getUrl(code, url_parms, Catagory2_parms)
dic_parms_ls = getUrl(code, url_parms_ls, Catagory2_parms_ls)
if len(dic_parms) > 0:
start_time_cj = time.time()
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, start_time)
log.info(f'{code}==========={short_name},发行公告,耗时{baseCore.getTimeCost(start_time_cj, time.time())}')
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, start_time,num)
log.info(f'{code}==========={short_name},{com_name},发行公告,耗时{baseCore.getTimeCost(start_time_cj, time.time())}')
start_time_ls = time.time()
SpiderByZJH(dic_parms_ls['url'], dic_parms_ls['payload'], dic_info, start_time)
log.info(f'{code}==========={short_name},临时报告,耗时{baseCore.getTimeCost(start_time_ls, time.time())}')
SpiderByZJH(dic_parms_ls['url'], dic_parms_ls['payload'], dic_info, start_time,num)
log.info(f'{code}==========={short_name},{com_name},临时报告,耗时{baseCore.getTimeCost(start_time_ls, time.time())}')
# UpdateInfoSql(retData,retData_ls,social_code)
# log.info(f'{code}================更新成功')
end_time = time.time()
......@@ -410,7 +415,7 @@ if __name__ == '__main__':
cursor.close()
cnx.close()
cursor_.close()
cnx_.close()
# cursor_.close()
# cnx_.close()
# 释放资源
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论