提交 060ce7c4 作者: 薛凌堃

2023/8/12

上级 aedff657
......@@ -92,135 +92,142 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
short_name = dic_info[4]
soup = RequestUrl(url, payload, item_id, start_time)
if soup == '':
return
return False
# 先获取页数
page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
total = re.findall(r'\d+', page)[0]
r_page = int(total) % 15
if r_page == 0:
Maxpage = int(total) // 15
else:
Maxpage = int(total) // 15 + 1
# 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1, Maxpage + 1):
if i == 1:
href = url
page = 0
try:
page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
except:
e = f"该企业没有{dic_parms['Catagory2']}数据"
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, dic_parms['url'], f'{e}')
return False
if page != 0:
total = re.findall(r'\d+', page)[0]
r_page = int(total) % 15
if r_page == 0:
Maxpage = int(total) // 15
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}_f.html'
soup = RequestUrl(href, payload, item_id, start_time)
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
for tr in tr_list[1:]:
td_list = tr.find_all('td')
pdf_url_info = td_list[2]
# print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
# print(name)
report_type = td_list[4].text.strip()
# print(report_type)
if report_type == '年报':
if '摘要' in name_pdf:
continue
# 年份还从pdf名称里抽取
try:
year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '')
except Exception as e:
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
year = int(pub_time) - 1
year = str(year)
page_size = 0
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s'''
cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone()
if selects:
print(f'com_name:{short_name}、{year}已存在')
continue
else:
# 类型为年报的话就解析该年报pdf,并入库
for i in range(0, 3):
try:
resp_content = requests.request("GET", pdf_url).content
# 获取pdf页数
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
break
except Exception as e:
print(e)
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
print(f'==={short_name}、{year}===pdf解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
Maxpage = int(total) // 15 + 1
# 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1, Maxpage + 1):
if i == 1:
href = url
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}_f.html'
soup = RequestUrl(href, payload, item_id, start_time)
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
for tr in tr_list[1:]:
td_list = tr.find_all('td')
pdf_url_info = td_list[2]
# print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
# print(name)
report_type = td_list[4].text.strip()
# print(report_type)
if report_type == '年报':
if '摘要' in name_pdf:
continue
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
print(e)
time.sleep(3)
continue
if result == '':
e = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# 年份还从pdf名称里抽取
try:
year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '')
except Exception as e:
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
year = int(pub_time) - 1
year = str(year)
page_size = 0
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s'''
cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone()
if selects:
print(f'com_name:{short_name}、{year}已存在')
continue
if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1'
item_id = item_id
group_name = 'group1'
path = bytes.decode(result['Remote file_id']).replace('group1', '')
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
page_size = page_size
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
category, file_size, order_by, status, create_by, create_time, page_size)
state = 1
else:
# 类型为年报的话就解析该年报pdf,并入库
for i in range(0, 3):
try:
resp_content = requests.request("GET", pdf_url).content
# 获取pdf页数
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
break
except Exception as e:
print(e)
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
print(f'==={short_name}、{year}===pdf解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
except:
e = '数据库传输失败'
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
continue
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
print(e)
time.sleep(3)
continue
if result == '':
e = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
num = num + 1
time.sleep(2)
else:
e = '采集失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
else:
continue
continue
if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1'
item_id = item_id
group_name = 'group1'
path = bytes.decode(result['Remote file_id']).replace('group1', '')
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
page_size = page_size
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
category, file_size, order_by, status, create_by, create_time, page_size)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
except:
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
num = num + 1
time.sleep(2)
else:
e = '采集失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
else:
continue
return True
def getUrl(code, url_parms, Catagory2_parms):
# 深市
......
......@@ -261,76 +261,78 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
if soup == '':
return False
# 先获取页数
page = 0
try:
page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
except:
e = f"该企业没有{dic_parms['Catagory2']}数据"
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, dic_parms['url'], 'Kafka操作失败')
baseCore.recordLog(social_code, taskType, state, takeTime, dic_parms['url'], f'{e}')
return False
total = re.findall(r'\d+', page)[0]
if page != 0:
total = re.findall(r'\d+', page)[0]
r_page = int(total) % 15
if r_page == 0:
Maxpage = int(total) // 15
else:
Maxpage = int(total) // 15 + 1
log.info(f'{short_name}====={code}===========一共{total}条,{Maxpage}页')
# 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1, Maxpage + 1):
log.info(f'==========正在采集第{i}页=========')
if i == 1:
href = url
r_page = int(total) % 15
if r_page == 0:
Maxpage = int(total) // 15
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}_f.html'
soup = RequestUrl(href, payload, social_code, start_time)
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
pageIndex = 0
for tr in tr_list[1:]:
pageIndex += 1
td_list = tr.find_all('td')
pdf_url_info = td_list[2]
# print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
year = pub_time[:4]
report_type = td_list[4].text.strip()
# 信息插入数据库
insert = InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type)
log.info(f'======={short_name}========{code}===插入公告库成功')
if insert:
# # 公告信息列表
# okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time)
if result:
# 公告信息列表
okCount = okCount + 1
log.info(f'{short_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '')
pass
else:
errorCount += 1
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.error(f'{short_name}=============解析或传输操作失败')
# try:
# insert_err_sql = f"insert into dt_err(xydm,`from`,url,title,pub_date,zhaiyao,create_date,state,pageNo,pageIndex,type) values('{social_code}','证监会','{pdf_url}','{name_pdf}','{pub_time}',' ',now(),1,{i},{pageIndex},'1')"
# cursor_.execute(insert_err_sql)
# cnx_.commit()
# except:
# pass
continue
Maxpage = int(total) // 15 + 1
log.info(f'{short_name}====={code}===========一共{total}条,{Maxpage}页')
# 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1, Maxpage + 1):
log.info(f'==========正在采集第{i}页=========')
if i == 1:
href = url
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}_f.html'
soup = RequestUrl(href, payload, social_code, start_time)
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
pageIndex = 0
for tr in tr_list[1:]:
pageIndex += 1
td_list = tr.find_all('td')
pdf_url_info = td_list[2]
# print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
year = pub_time[:4]
report_type = td_list[4].text.strip()
# 信息插入数据库
insert = InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type)
log.info(f'======={short_name}========{code}===插入公告库成功')
if insert:
# # 公告信息列表
# okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time)
if result:
# 公告信息列表
okCount = okCount + 1
log.info(f'{short_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '')
pass
else:
errorCount += 1
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.error(f'{short_name}=============解析或传输操作失败')
# try:
# insert_err_sql = f"insert into dt_err(xydm,`from`,url,title,pub_date,zhaiyao,create_date,state,pageNo,pageIndex,type) values('{social_code}','证监会','{pdf_url}','{name_pdf}','{pub_time}',' ',now(),1,{i},{pageIndex},'1')"
# cursor_.execute(insert_err_sql)
# cnx_.commit()
# except:
# pass
continue
return True
#state2
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论