提交 83f1bc4d 作者: LiuLiYuan

年报 10/13

上级 7bc6080a
......@@ -342,9 +342,10 @@ class BaseCore:
# 获取logger
def getLogger(self, fileLogFlag=True, stdOutFlag=True):
pid = self.getPID()
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
filename = filename.replace(".py", "") + f"{pid}.log"
if not os.path.exists(dirname):
os.mkdir(dirname)
......@@ -660,7 +661,7 @@ class BaseCore:
return selects
# 插入到att表 返回附件id
def tableUpdate(self, retData, com_name, year, pdf_name, num):
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
......@@ -680,12 +681,12 @@ class BaseCore:
id = ''
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,publish_time,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'zzsn')'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
create_time, page_size, path, pub_time)
self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交
......@@ -733,11 +734,11 @@ class BaseCore:
else:
self.getLogger().info(f'=====文件存在obs========{file_path}')
def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time):
def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time,create_by):
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': create_by,
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
......@@ -783,7 +784,8 @@ class BaseCore:
return retData
return retData
@retry(tries=3,delay=1)
@retry(tries=3, delay=1)
def getOBSres(self, pathType, now_time, name, response):
result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
# resp = obsClient.putFile('zzsn', f'{pathType}{now_time}/' + name, file_path='要上传的那个文件的本地路径')
......
import json
import json
......@@ -21,7 +21,7 @@ tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会'
pathType = 'ZJHAnnualReport/'
pathType = 'QYYearReport/'
def RequestUrl(url, payload, item_id, start_time):
# ip = get_proxy()[random.randint(0, 3)]
......@@ -125,7 +125,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
log.info(f'com_name:{short_name}、{year}已存在')
continue
else:
retData = baseCore.uptoOBS(pdf_url,name_pdf, 1, social_code,pathType,taskType,start_time)
retData = baseCore.uptoOBS(pdf_url,name_pdf, 1, social_code,pathType,taskType,start_time,'XueLingKun')
if retData['state']:
pass
else:
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -156,7 +156,7 @@ def spider_annual_report(dict_info,num):
continue
else:
#上传文件至obs服务器
retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time)
retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time,'XueLingKun')
if retData['state']:
pass
else:
......@@ -235,7 +235,7 @@ def spider_annual_report(dict_info,num):
if __name__ == '__main__':
num = 0
taskType = '企业年报/雪球网'
pathType = 'XQWAnnualReport/'
pathType = 'QYYearReport/'
while True:
start_time = time.time()
# 获取企业信息
......
......@@ -26,7 +26,7 @@ headers = {
'Pragma': 'no-cache'
}
pattern = r"\d{4}-\d{2}-\d{2}"
pathType = 'QYNotice/'
def ifInstert(social_code, pdf_url):
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='新浪财经' and type='1' '''
......@@ -54,14 +54,14 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time, com_n
if not response:
return False
# 上传至华为云服务器
retData = baseCore.uptoOBS(pdf_url, pdf_name, 8, social_code, 'XLCJNotice', taskType, start_time, 'LiuLiYuan')
retData = baseCore.uptoOBS(pdf_url, pdf_name, 8, social_code, pathType, taskType, start_time,'LiuLiYuan')
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
att_id = baseCore.tableUpdate(retData, com_name, year, pdf_name, num)
att_id = baseCore.tableUpdate(retData, com_name, year, pdf_name, num,pub_time)
if att_id:
pass
else:
......@@ -214,38 +214,22 @@ def doJob():
continue
pub_time = format_time(time_list[i])
year = pub_time[:4]
print(name_pdf,pub_time,pdf_url)
# result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time, com_name, num)
# num += 1
# if result:
# # 公告信息列表
# log.info(f'{short_name}==============解析传输操作成功')
# state = 1
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, href, '成功')
# try:
# # 发送kafka成功之后 再插入数据库
# insertMysql(social_code, href, pub_time)
# log.info(f"{social_code}==={href}===数据库插入成功")
# except:
# log.info(f"{social_code}==={href}===数据库插入失败")
# continue
# else:
# continue
# break
# time.sleep(0.5)
# except Exception as e:
# ee = e.__traceback__.tb_lineno
# log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
# break
# except:
# log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time, com_name, num)
if result:
# 公告信息列表
log.info(f'{short_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, '成功')
try:
# 发送kafka成功之后 再插入数据库
insertMysql(social_code, href, pub_time)
log.info(f"{social_code}==={href}===数据库插入成功")
except:
log.info(f"{social_code}==={href}===数据库插入失败")
continue
else:
continue
next_flg = soup.select('#con02-7 > table > tr')[1].select('div')[2].text
if '下一页' not in next_flg:
break
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论