提交 119a9a33 作者: 薛凌堃

11/24

上级 c0b05e59
......@@ -540,19 +540,20 @@ if __name__ == '__main__':
while True:
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/国内企业基本信息采集情况_{nowtime}.xlsx'
file_name = f'./data/国内企业基本信息采集情况.xlsx'
file.createFile(file_name)
cookieinfo = token.getToken()
id_cookie = cookieinfo[0]
cookie_ = json.loads(cookieinfo[1])
# print(type(cookies))
if cookie_:
if cookieinfo:
pass
else:
log.info('==========已无cookies==========')
time.sleep(30)
continue
id_cookie = cookieinfo[0]
cookie_ = json.loads(cookieinfo[1])
# print(type(cookies))
# cookie_ = json.loads(cookies)
# print(type(cookie_))
log.info(f"获取cookie到----{cookie_}")
......@@ -621,7 +622,7 @@ if __name__ == '__main__':
# exchange = ''
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,ynDomestic, countryName, file_name)
time.sleep(40)
time.sleep(2)
# break
# baseCore.r.close()
# baseCore.sendEmail(file_name)
......
# -*- coding: utf-8 -*-
"""
模拟点击的方法不行,涉及到需要账号登录
"""
import json
import re
import time
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import sys
# sys.path.append('D:\\KK\\zzsn_spider\\base')
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
from classtool import Token, File, Tag
token = Token()
file = File()
tag = Tag()
# 发送数据
def sendkafka(post_data):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("enterpriseInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
except:
exception = 'kafka传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
log.info(f"{com_name}--{social_code}--kafka传输失败")
# 合并基本信息和工商信息字段
def getinfo(dict1,dict2):
# 取出两个字典的key值集合
keys1 = set(dict1.keys())
keys2 = set(dict2.keys())
# 取出并集
union_keys = keys1 | keys2
# 根据并集的key值,从两个字典中取出value值,组成新的字典
result_dict = {key: dict1.get(key, None) or dict2.get(key, None) for key in union_keys}
return result_dict
# 获取基本信息
def baseinfo(com_soup):
baseinfo = com_soup.find('div', class_='contact-info')
cominfo_list = baseinfo.find_all('span', class_='f')
data = {}
for cominfo in cominfo_list:
# print(cominfo)
value = cominfo.find('span', class_='val').text.replace('复制', '').strip(' ')
pattern = r'\(\d{4}\s*年\)'
match = re.search(pattern, value)
if match:
# print(match.group(0))
value = value.split(match.group(0))[0]
# print(value)
tag.deletep(cominfo, 'span', 'class', 'val')
tag.deletep(cominfo, 'a', '', '')
tag.deletek(cominfo)
# print(cominfo)
name = cominfo.text.replace('\n', '').replace('复制', '').strip(' ').replace(':', '')
# print(name,value)
data[name] = value
return data
# 检查登陆状态
def checklogin(key):
# url = f'https://www.qcc.com/web/search?key=91110108558521630L'
url = f'https://www.qcc.com/web/search?key={key}'
# ip = baseCore.get_proxy()
# req = requests.get(headers=headers, url=url, proxies=ip)
req = requests.get(headers=headers, url=url)
soup = BeautifulSoup(req.content, 'html.parser')
if soup.find('title').text == '会员登录 - 企查查':
log.info('状态---未登录')
soup = ''
return soup
return soup
# 处理要发送的字段
def dic_handle(result_dic):
zxss = ['北京市', '天津市', '上海市', '重庆市']
try:
company_name = result_dic['企业名称']
except:
company_name = ''
try:
CreditCode = result_dic['统一社会信用代码']
except:
CreditCode = ''
try:
OperName = result_dic['法定代表人']
except:
OperName = ''
try:
PhoneNumber = result_dic['电话']
except:
PhoneNumber = ''
try:
WebSite = result_dic['官网']
except:
WebSite = ''
try:
Email = result_dic['邮箱']
except:
Email = ''
try:
Desc = result_dic['简介']
except:
Desc = ''
try:
Status = result_dic['登记状态']
except:
Status = ''
try:
StartDate = result_dic['成立日期']
except:
StartDate = ''
try:
RecCap = result_dic['实缴资本']
except:
RecCap = ''
try:
RegistCapi = result_dic['注册资本']
except:
RegistCapi = ''
try:
CheckDate = result_dic['核准日期']
except:
CheckDate = ''
try:
OrgNo = result_dic['组织机构代码']
except:
OrgNo = ''
try:
No = result_dic['工商注册号']
except:
No = ''
try:
taxpayerNo = result_dic['纳税人识别号']
except:
taxpayerNo = ''
try:
EconKind = result_dic['企业类型']
except:
EconKind = ''
try:
TermStart = result_dic['营业期限'].split('至')[0]
except:
TermStart = ''
try:
TeamEnd = result_dic['营业期限'].split('至')[1]
except:
TeamEnd = ''
try:
TaxpayerType = result_dic['纳税人资质']
except:
TaxpayerType = ''
try:
SubIndustry = result_dic['国标行业']
except:
SubIndustry = ''
try:
region = result_dic['所属地区']
except:
region = ''
try:
pattern = r'^(.*?省|.*?自治区)?(.*?市|.*?自治州)?(.*?区|.*?县|.*?自治县|.*?市辖区)?(.*?区|.*?县|.*?自治县|.*?市辖区)?$'
matches = re.match(pattern, region)
Province = matches.group(1)
City = matches.group(2)
County = matches.group(3)
if Province is None:
for zxs in zxss:
if zxs in region:
Province = zxs
break
except:
Province = ''
City = ''
County = ''
try:
BelongOrg = result_dic['登记机关']
except:
BelongOrg = ''
try:
Info = result_dic['人员规模']
except:
Info = ''
try:
can_bao = result_dic['参保人数']
except:
can_bao = ''
try:
OriginalName = result_dic['曾用名']
except:
OriginalName = ''
try:
EnglishName = result_dic['英文名']
except:
EnglishName = ''
try:
IxCode = result_dic['进出口企业代码']
except:
IxCode = ''
try:
Address = result_dic['地址']
except:
Address = ''
try:
Scope = result_dic['经营范围']
except:
Scope = ''
aa_dict = {
'name': company_name, # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': CreditCode, # 统一社会信用代码
'legalPerson': OperName, # 法定代表人
'officialPhone': PhoneNumber, # 电话
'officialUrl': WebSite, # 官网
'officialEmail': Email, # 邮箱
'briefInfo': Desc, # 简介
'registerStatus': Status, # 登记状态
'incorporationDate': StartDate, # 成立日期
'capital': RegistCapi, # 注册资本
'paidCapital': RecCap, # 实缴资本
'approvalDate': CheckDate, # 核准日期
'organizationCode': OrgNo, # 组织机构代码
'registerNo': No, # 工商注册号
'taxpayerNo': taxpayerNo, # 纳税人识别号
'type': EconKind, # 企业类型
'businessStartDate': TermStart, # 营业期限自
'businessEndDate': TeamEnd, # 营业期限至
'taxpayerQualification': TaxpayerType, # 纳税人资质
'industry': SubIndustry, # 所属行业
'region': region,
'province': Province, # 所属省
'city': City, # 所属市
'county': County, # 所属县
'registerDepartment': BelongOrg, # 登记机关
'scale': Info, # 人员规模
'insured': can_bao, # 参保人数
'beforeName': OriginalName, # 曾用名
'englishName': EnglishName, # 英文名
'importExportEnterpriseCode': IxCode, # 进出口企业代码
'address': Address, # 地址
'businessRange': Scope, # 经营范围
'status': 0, # 状态
}
return aa_dict
# 采集准备
def redaytowork(com_name,social_code,securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
# if social_code:
# dic_info = baseCore.getInfomation(social_code)
# elif not social_code:
# dic_info = baseCore.getBYnameInfomation(com_name)
# else:
# dic_info = ''
# if dic_info:
# pass
log.info(f'----当前企业{social_code}-{com_name}--开始处理---')
# count = dic_info[14]
count = 0
# 企查查id
# company_id = dic_info[12]
# 如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if social_code:
soup = checklogin(social_code)
else:
soup = checklogin(com_name)
if not soup:
log.info("登录失效===重新放入redis")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.delete_token(id_cookie)
log.info('=====已重新放入redis,失效cookies已删除======')
time.sleep(20)
return count
else:
try:
searchinfo = soup.find_all('div', class_='npanel-heading')[1].find('span', class_='text-danger').text
except:
log.info("登录失效===重新放入redis")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
time.sleep(20)
return count
if searchinfo == '0':
log.info('=====搜索不到该企业====')
data = [com_name, social_code]
# todo:搜不到的企业需要返回到一个表格中
file.appenddata(file_name, '需处理企业', data)
return count
else:
# 开始采集
try:
if spiderwork(soup, com_name, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
count += 1
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time, time.time())}')
token.updateTokeen(id_cookie,3)
return count
else:
return count
except Exception as e:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
return count
def ifbeforename(company_url):
req_ = requests.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('div', class_='cominfo-normal')
except:
businessinfo = ''
if businessinfo:
try:
name = businessinfo.find('div', class_='ntag text-gray original-tag').text
value = businessinfo.find('div', class_='original-name-list').text.replace('展开', '').replace(' ', '').replace('…','').replace('\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
return value
else:
return ''
# 采集基本信息和工商信息
def spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
qccid = company_url.split('firm/')[1].split('.html')[0]
# 将采集到的企查查id更新
updateSql = f"update EnterpriseInfo set QCCID = '{qccid}' where SocialCode = '{social_code}'"
cursor_.execute(updateSql)
cnx_.commit()
# ip = baseCore.get_proxy()
# req_ = requests.get(headers=headers, url=company_url, proxies=ip)
req_ = requests.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('div', class_='cominfo-normal')
except:
businessinfo = ''
if businessinfo:
data_businfo = {}
data_baseinfo = baseinfo(com_soup)
# print(data_baseinfo)
try:
name = businessinfo.find('div', class_='ntag text-gray original-tag').text
value = \
businessinfo.find('div', class_='original-name-list').text.replace('展开', '').replace(' ', '').replace('…', '').replace('\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
data_businfo[name] = value
td_tags = businessinfo.find_all('td')
# print(td_tags)
for td in td_tags:
if 'class' in td.attrs and 'tb' in td['class']:
div_tags = td.find_all('div')
texts = [div.text for div in div_tags]
if len(texts) > 0:
for text in texts[::-1]:
data_businfo[text.replace('复制', '').replace('\n', '').strip(' ')] = None
else:
data_businfo[td.text.replace('复制', '').replace('\n', '').strip(' ')] = None
else:
# 没有class='tb'属性的标签
att_list = ['inline-block', 'ntag-v2', 'm-l-r-10', 'm-l-sm']
for att in att_list:
tag.deletep(td, 'a', 'class', att)
tag.deletek(td)
tag.deletep(td, 'div', 'class', 'text-gray clearfix original-name-part')
tag.deletespan(td)
# if len(result_dict) <= len(td_tags) // 2:
div_tags = td.find_all('div')
texts = [div.text for div in div_tags if len(div.attrs) == 0]
if len(texts) > 0:
i = 1
for text in texts:
if text == ' ':
continue
data_businfo[list(data_businfo.keys())[-i]] = text.replace('复制', '').replace('\n',
'').replace(
' ', '')
i += 1
else:
if '实缴资本' in td.text:
# pattern = r"\d+万美元"
# match = re.search(pattern, td.text.replace('复制', '').replace('\n', '').replace(' ', ''))
# if match:
# value = match.group()
value = td.text.replace('复制', '').replace('\n', '').replace(' ', '').split('实缴资本')[0]
data_businfo[list(data_businfo.keys())[-1]] = value
else:
data_businfo[list(data_businfo.keys())[-1]] = td.text.replace('复制', '').replace('\n',
'').replace(
' ',
'')
result_dict = getinfo(data_businfo, data_baseinfo)
# print(result_dict)
# 采集成功的企业
data = [com_name, result_dict['企业名称'], social_code, result_dict['统一社会信用代码']]
file.appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(result_dict)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
# print(aa_dic)
sendkafka(aa_dic)
else:
data_baseinfo = baseinfo(com_soup)
# 采集成功的企业
data = [com_name, data_baseinfo['企业名称'], social_code, data_baseinfo['统一社会信用代码']]
file.appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(data_baseinfo)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
sendkafka(aa_dic)
# 判断名称是否统一
def spiderwork(soup, receptname, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
company_url = ''
try:
company_list = soup.find('table', class_='app-ltable ntable ntable-list ntable ntable-list')
tr_list = company_list.find_all('tr', class_='tsd0')
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
return False
# receptname = '小米通讯技术有限公司'
for tr in tr_list:
info_t = tr.find('span',class_='copy-title')
getname = info_t.find('span').text
log.info(f'接收到的企业名称--{receptname}---采到的企业名称--{getname}')
if receptname and getname == receptname:
company_url = info_t.find('a')['href']
break
elif not receptname:
company_url = info_t.find('a')['href']
break
else:
continue
if company_url:
# company_url = 'https://www.qcc.com/firm/80af5085726bb6b9c7770f1e4d0580f4.html'
# company_url = 'https://www.qcc.com/firm/50f75e8a8859e609ec37976f8abe827d.html'
# 采集基本信息和工商信息
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name)
else:
# 判断是否是曾用名
tr = tr_list[:1][0]
info_t = tr.find('span', class_='copy-title')
getname = info_t.find('span').text
log.info(f'------可能是曾用名------接收到的企业名称--{receptname}---采到的企业名称--{getname}')
company_url = info_t.find('a')['href']
beforename = ifbeforename(company_url)
if beforename == receptname:
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name)
else:
#没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业',data)
time.sleep(2)
return False
return True
if __name__ == '__main__':
taskType = '基本信息/企查查'
while True:
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/国内企业基本信息采集情况_{nowtime}.xlsx'
file.createFile(file_name)
cookieinfo = token.getToken()
if cookieinfo:
pass
else:
log.info('==========已无cookies==========')
time.sleep(30)
continue
id_cookie = cookieinfo[0]
cookie_ = json.loads(cookieinfo[1])
# print(type(cookies))
# cookie_ = json.loads(cookies)
# print(type(cookie_))
log.info(f"获取cookie到----{cookie_}")
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
# 'Cookie': 'qcc_did=046d99c9-566e-4046-9094-689901b79748; UM_distinctid=18aac5b8c21810-046f8431aecf58-26031f51-1fa400-18aac5b8c22efd; CNZZDATA1254842228=109635008-1695108795-https%253A%252F%252Fwww.qcc.com%252F%7C1695113473; _uab_collina=169935323766710839405007; acw_tc=db9062a717000200596487102e63dac7bed6aad2a049361c973816fabf; QCCSESSID=3c95642bd6445b7681c8fc6411',
'Cookie': f'qcc_did={cookie_["qcc_did"]}; acw_tc={cookie_["acw_tc"]}; QCCSESSID={cookie_["QCCSESSID"]}',
'Host': 'www.qcc.com',
'Referer': 'https://www.qcc.com/',
'Sec-Ch-Ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
start_time = time.time()
# 获取企业信息
company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
# company_field = '91220101606092819L||'
if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮
baseCore.sendEmail(file_name)
time.sleep(20)
file.deleteFile(file_name)
continue
if company_field == '' or company_field is None:
# 本轮结束后没有新增的企业要采集
file.deleteFile(file_name)
flag = True
while flag:
log.info('--------已没有数据---------')
time.sleep(30)
company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
if company_field:
flag = False
log.info("-----已添加数据------")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode',company_field)
continue
continue
social_code = company_field.split('|')[0]
com_name = company_field.split('|')[2].replace(' ', '')
ynDomestic = company_field.split('|')[15]
countryName = company_field.split('|')[16]
securitiesCode = company_field.split('|')[17]
securitiesShortName = company_field.split('|')[18]
listingDate = company_field.split('|')[21]
category = company_field.split('|')[19]
exchange = company_field.split('|')[20]
# ynDomestic = ''
# countryName = ''
# securitiesCode = ''
# securitiesShortName = ''
# listingDate = ''
# category = ''
# exchange = ''
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,ynDomestic, countryName, file_name)
time.sleep(40)
# break
# baseCore.r.close()
# baseCore.sendEmail(file_name)
# 信息采集完成后将该企业的采集次数更新
# runType = 'BaseInfoRunCount'
# baseCore.updateRun(social_code, runType, count)
baseCore.close()
\ No newline at end of file
import os
import os
......@@ -90,8 +90,8 @@ def getOBSres(pathType,name, response):
return result
def secrchATT(item_id, retData, type_id):
sel_sql = f"select id from clb_sys_attachment where item_id = '{item_id}' and path = '{retData['path']}' and type_id={type_id} "
cursor_.execute(sel_sql)
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id))
selects = cursor_.fetchone()
return selects
......@@ -129,7 +129,7 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
print(e)
log.info(f"更新完成:{item_id}===={pdf_name+category}")
try:
selects = secrchATT(item_id, pdf_name, type_id)
selects = secrchATT(item_id, retData, type_id)
except Exception as e:
log.info(e)
id = selects[0]
......@@ -217,8 +217,8 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopicaaaas",
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
......@@ -342,7 +342,7 @@ def gonggao_info(dic_info):
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '成功')
# 发送kafka成功之后 再插入数据库
insert = InsterInto(social_code, pdf_url, info_date, title)
insert = InsterInto(social_code, info_url, info_date, title)
if insert:
log.info(f'===={social_code}========{title}=====插入库成功')
pass
......@@ -362,8 +362,8 @@ if __name__ =='__main__':
while True:
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode')
social_code = 'ZZSN23030900000316'
social_code = baseCore.redicPullData('NoticeEnterprise:mgqy_socialCode_add')
# social_code = 'ZZSN23030900000316'
if not social_code:
time.sleep(20)
continue
......@@ -378,8 +378,11 @@ if __name__ =='__main__':
code = dic_info[3]
com_name = dic_info[1]
log.info(f'-----开始处理{com_name}----{social_code}------')
gonggao_info(dic_info)
break
try:
gonggao_info(dic_info)
except:
log.info(f'-----error:{com_name}----{social_code}------')
import os
import os
import uuid
import requests,time, json, sys
from kafka import KafkaProducer
from retry import retry
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
pathType = 'QYNotice/'
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {}
category = os.path.splitext(pdf_url)[1]
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
break
except:
time.sleep(3)
continue
page_size = 1
name = str(getuuid()) + category
try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result
def secrchATT(item_id, retData, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name+category, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn')
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
except Exception as e:
print(e)
log.info(f"更新完成:{item_id}===={pdf_name+category}")
try:
selects = secrchATT(item_id, retData, type_id)
except Exception as e:
log.info(e)
id = selects[0]
return id
def InsterInto(social_code, pdf_url,pub_time,pdf_name):
insert = False
# 信息插入数据库
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,create_time) values(%s,%s,%s,%s,%s,%s,now())'''
list_info = [
social_code,
pdf_url,
'东方财富网',
'1',
pub_time,
pdf_name
]
#144数据库
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
insert = True
return insert
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败')
return insert
def ifInstert(short_name, social_code, pdf_url):
ifexist = True
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='东方财富网' and type='1' '''
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
#如果数据库中存在 则跳过
if selects:
ifexist = False
log.info(f'com_name:{short_name}、{pdf_url}已存在')
return ifexist
else:
return ifexist
def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_time,com_name,num):
# 上传至华为云服务器
retData = uptoOBS(pdf_url, pdf_name, 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1
att_id = tableUpdate(retData, com_name, year, pdf_name, num)
if att_id:
pass
else:
return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '东方财富网',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': info_url, # 原文链接
'summary': '',
'title': pdf_name.replace('.pdf', ''),
'type': 3,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
log.info(dic_result)
return False
def gonggao_info(dic_info):
code = dic_info[3]
com_name = dic_info[1]
social__code = dic_info[2]
if 'HK' in code:
# browser.quit()
return
# code1 = str(code)
# while True:
# if len(code1) < 6:
# code1 = '0' + code1
# else:
# break
# if code1[0] == '0' or code1[0] == '3' or code[0] == '2':
# com_code = 'SZ' + code1
# elif code1[0] == '6' or code1[0] == '9':
# com_code = 'SH' + code1
# elif code1[0] == '8' or code1[0] == '4':
# com_code = 'BJ' + code1
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index=1&ann_type=U%2CU_Pink&client_source=web&stock_list={code}'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
total_hits = res_json['data']['total_hits']
for page1 in range(1,total_hits+1):
# url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0'
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=U%2CU_Pink&client_source=web&stock_list={code}'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
list_all = res_json['data']['list']
if list_all:
for one_info in list_all:
title = one_info['title']
info_date = one_info['notice_date']
year = info_date[:4]
# if page1 > 1 and '2022' in info_date:
# break_id = 1
# break
# if '2021' in info_date: # 只采集22年以后的数据
# break_id = 1
# break
try:
info_type = one_info['columns'][0]['column_name']
except:
info_type = ''
art_code = one_info['art_code']
info_url = 'https://data.eastmoney.com/notices/detail/' + code + '/' + art_code + '.html'
t = int(time.time() * 1000)
json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}'
for n1 in range(0, 3):
try:
json_2 = requests.get(json_url, verify=False).json()
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
try:
pdf_url = json_2['data']['attach_url']
except:
pdf_url = ''
try:
info_content = json_2['data']['notice_content']
except:
info_content = ''
ifexist = ifInstert(com_name, social_code, info_url)
if ifexist:
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, info_url,title, social_code, year, info_date, start_time, com_name, num)
if result:
# 公告信息列表
log.info(f'{com_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '成功')
# 发送kafka成功之后 再插入数据库
insert = InsterInto(social_code, info_url, info_date, title)
if insert:
log.info(f'===={social_code}========{title}=====插入库成功')
pass
else:
continue
else:
log.info(f'======={com_name}========{code}===已存在')
continue
if __name__ =='__main__':
#从redis中读取social_code'
list_c = []
list_all_info_1 = []
num = 0
taskType = '企业公告/东方财富网'
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterprise:mgqy_socialCode_add')
# social_code = 'ZZSN23030900000316'
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[1]
log.info(f'-----开始处理{com_name}----{social_code}------')
try:
gonggao_info(dic_info)
except:
log.info(f'-----error:{com_name}----{social_code}------')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论