提交 3514653b 作者: 薛凌堃

10/17

上级 89859afd
...@@ -55,26 +55,27 @@ def closeSql(cnx,cursor): ...@@ -55,26 +55,27 @@ def closeSql(cnx,cursor):
def NewsEnterprise(): def NewsEnterprise():
cnx,cursor = connectSql() cnx,cursor = connectSql()
# #获取国内企业 # #获取国内企业
# gn_query = "select SocialCode from EnterpriseInfo where Place = '1'" gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
# cursor.execute(gn_query) cursor.execute(gn_query)
# gn_result = cursor.fetchall() gn_result = cursor.fetchall()
#获取国外企业 #获取国外企业
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'" # gw_query = "select SocialCode from EnterpriseInfo where Place = '2'"
cursor.execute(gw_query) # cursor.execute(gw_query)
gw_result = cursor.fetchall() # gw_result = cursor.fetchall()
cnx.commit() cnx.commit()
gw_social_list = [item[0] for item in gw_result] # gw_social_list = [item[0] for item in gw_result]
#todo:打印长度 #todo:打印长度
# print(len(gw_social_list)) # print(len(gw_social_list))
# gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
#将数据插入到redis中 #将数据插入到redis中
# for item in gn_social_list: for item in gn_social_list:
# r.rpush('NewsEnterprise:gnqy_socialCode', item) r.rpush('NewsEnterprise:gnqy_socialCode', item)
# r.rpush('NewsEnterprise:gnqybuchong_socialCode', item)
for item in gw_social_list: # for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item) # r.rpush('NewsEnterprise:gwqy_socialCode', item)
closeSql(cnx,cursor) # closeSql(cnx,cursor)
#企业动态定时任务 #企业动态定时任务
def NewsEnterprise_task(): def NewsEnterprise_task():
...@@ -323,14 +324,15 @@ def BaseInfoAbroad_task(): ...@@ -323,14 +324,15 @@ def BaseInfoAbroad_task():
def SEC_CIK(): def SEC_CIK():
cnx,cursor = connectSql() cnx,cursor = connectSql()
cik_query = "select cik from mgzqyjwyh_list where state=2" # cik_query = "select cik from mgzqyjwyh_list where state=2"
cik_query = "select cik from mgzqyjwyh_list where state = 2 and not exists (SELECT social_credit_code FROM brpa_source_article WHERE origin = 'SEC' AND type = 1 AND mgzqyjwyh_list.xydm = brpa_source_article.social_credit_code)"
cursor.execute(cik_query) cursor.execute(cik_query)
cik_result = cursor.fetchall() cik_result = cursor.fetchall()
cnx.commit() cnx.commit()
cik_list = [item[0] for item in cik_result] cik_list = [item[0] for item in cik_result]
print('=====') print('=====')
for item in cik_list: for item in cik_list:
r.rpush('Sec_cik_US:uscik_baseinfo',item) # r.rpush('Sec_cik_US:uscik_baseinfo',item)
r.rpush('Sec_cik_US:uscik_annualReport', item) r.rpush('Sec_cik_US:uscik_annualReport', item)
closeSql(cnx,cursor) closeSql(cnx,cursor)
...@@ -453,57 +455,89 @@ def omeng(): ...@@ -453,57 +455,89 @@ def omeng():
#单项冠军 #单项冠军
def danxiangguanjun(): def danxiangguanjun():
cnx, cursor = connectSql() cnx, cursor = connectSql()
query = "SELECT CompanyName FROM champion" query = "SELECT SocialCode FROM champion limit 10"
cursor.execute(query) cursor.execute(query)
result = cursor.fetchall() result = cursor.fetchall()
cnx.commit() cnx.commit()
com_namelist = [item[0] for item in result] com_namelist = [item[0] for item in result]
for item in com_namelist: for item in com_namelist:
r.rpush('champion:baseinfo',item) r.rpush('champion:news',item)
#科改示范 #科改示范
def kegaishifan(): def kegaishifan():
cnx, cursor = connectSql() cnx, cursor = connectSql()
query = "SELECT CompanyName FROM technological" query = "SELECT SocialCode FROM technological limit 10"
cursor.execute(query) cursor.execute(query)
result = cursor.fetchall() result = cursor.fetchall()
cnx.commit() cnx.commit()
com_namelist = [item[0] for item in result] com_namelist = [item[0] for item in result]
for item in com_namelist: for item in com_namelist:
r.rpush('technological:baseinfo',item) r.rpush('technological:news',item)
#双百企业 #双百企业
def shuangbaiqiye(): def shuangbaiqiye():
cnx, cursor = connectSql() cnx, cursor = connectSql()
query = "SELECT CompanyName FROM Hundred" query = "SELECT SocialCode FROM Hundred limit 10 "
cursor.execute(query) cursor.execute(query)
result = cursor.fetchall() result = cursor.fetchall()
cnx.commit() cnx.commit()
com_namelist = [item[0] for item in result] com_namelist = [item[0] for item in result]
for item in com_namelist: for item in com_namelist:
r.rpush('hundred:baseinfo', item) r.rpush('hundred:news', item)
#专精特新 #专精特新
def zhuangjingtexind(): def zhuangjingtexind():
pass cnx, cursor = connectSql()
query = "SELECT SocialCode FROM specialed limit 10"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('specialed:news', item)
#独角兽企业
def dujioashou():
cnx, cursor = connectSql()
query = "SELECT SocialCode FROM dujiaoshou"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('dujiaoshou:news', item)
def fbspdfurlinfo():
cnx, cursor = connectSql()
query = "SELECT id FROM omengpdfinfo where state is null"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('ompdfurlinfo:id', item)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
fbspdfurlinfo()
# danxiangguanjun() # danxiangguanjun()
# kegaishifan() # kegaishifan()
shuangbaiqiye() # shuangbaiqiye()
# zhuangjingtexind()
# NoticeEnterprise() # NoticeEnterprise()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() # NewsEnterprise_task()
# NewsEnterprise() # NewsEnterprise()
# dujioashou()
# BaseInfoEnterprise() # BaseInfoEnterprise()
# FBS() # FBS()
# MengZhi() # MengZhi()
# NQEnterprise() # NQEnterprise()
# SEC_CIK() # SEC_CIK()
# dujioashou()
# omeng() # omeng()
# AnnualEnterpriseUS() # AnnualEnterpriseUS()
# NoticeEnterprise_task() # NoticeEnterprise_task()
......
# -*- coding: utf-8 -*- # coding: utf-8
import redis
import time
from urllib.parse import quote
import pymongo
import requests
from bson.objectid import ObjectId
import json import json
from pyquery import PyQuery as pq
import urllib3
import hashlib
from kafka import KafkaProducer
import pandas as pd
import zhconv
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(name):
urllib3.disable_warnings()
qcc_key = name
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except:
print('重试')
time.sleep(5)
continue
time.sleep(2)
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = ''
else:
KeyNo = ''
print("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
# 判断字符串里是否含数字
def str_have_num(str_num):
panduan = False
for str_1 in str_num:
ppp = str_1.isdigit()
if ppp:
panduan = ppp
return panduan
# 通过企查查id获取企业官网
def info_by_id(com_id,com_name):
aa_dict_list = []
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = "https://xcx.qcc.com/mp-weixin/forwardApp/v1/ent/detail?token={}&t={}&unique={}".format(token, t, com_id)
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
time.sleep(2)
try:
result_dict = resp_dict['result']['Company']
except:
print(com_name + ":获取失败")
try:
WebSite = result_dict['companyExtendInfo']['WebSite']
except:
WebSite = None
if WebSite is None:
try:
WebSite = result_dict['ContactInfo']['WebSite'][0]['Url']
except:
WebSite = ''
print(com_name + ":爬取完成")
return WebSite
from kafka import KafkaProducer
headers = { string_ = """浙江省三建建设集团有限公司是浙江省属国企浙建集团旗下重点企业,创建于1978年1月,拥有建筑工程总承包特级资质和行业甲级设计资质,集“投融建管营”于一体,具有较完整的产业链和专业优势。集团致力成为行业工程总承包建设和服务领域的领跑者,下设工程公司、专业公司、区域公司、投资企业等机构20余家,承建了公共建筑、工业厂房、商用住宅等国家、省、市重点工程上千项,创出了杭州西湖文化广场、国际会议中心等一批经典工程,施工区域辐射四川、陕西、江苏、海南等10多个省市以及阿尔及利亚市场, 年经营规模200亿元以上。"""
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive', string_.encode('utf-8')
'Qcc-Platform': 'mp-weixin', aa_dict = {
'Qcc-Timestamp': '', 'qccId': '', # 企查查企业id
'Qcc-Version': '1.0.0', 'name': '浙江省三建建设集团有限公司', # 企业名称
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat', 'shortName': '', # 企业简称
'content-type': 'application/json', 'socialCreditCode': '91330000142936138F', # 统一社会信用代码
'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html', 'legalPerson': '郑晖', # 法定代表人
'Accept-Encoding': 'gzip, deflate, br,' 'officialPhone': '18757503711', # 电话
} 'officialUrl': 'http://www.ccjsjt.cn/', # 官网
'officialEmail': 'ccjsjt@ccjsjt.cn', # 邮箱
#TODO:需要隔两个小时左右抓包修改 'briefInfo': string_,
token = '1dcc61d85177733298e5827653706f1a' # 需要隔两个小时左右抓包修改 'registerStatus': '', # 登记状态
start = time.time() 'incorporationDate': ' 1978-01-01', # 成立日期
list_weicha = [] 'capital': '680393449', # 注册资本
#待采集企业文件 'paidCapital': '680393449', # 实缴资本
filename = 'data/内蒙古市属国有企业_官网.xlsx' 'approvalDate': '2023-03-23', # 核准日期
df_all = pd.read_excel('data/内蒙古市属国有企业.xls',dtype=str) 'organizationCode': '14293613-8', # 组织机构代码
list_all_info = [] 'registerNo': '330000000015456', # 工商注册号
for num_df in range(162,len(df_all)): 'taxpayerNo': '91330000142936138F', # 纳税人识别号
#企业社会信用代码 'type': '', # 企业类型
id_code = str(df_all['本企业代码'][num_df]) 'businessStartDate': '1978-01-01', # 营业期限自
#企业名称 'businessEndDate': ' 无固定期限', # 营业期限至
com_name = str(df_all['企业名称'][num_df]) 'taxpayerQualification': '增值税一般纳税人', # 纳税人资质
#行次 'industry': '房屋建筑业', # 所属行业
line = str(df_all['行次'][num_df]) 'province': '浙江省', # 所属省
dic_com = { 'city': '杭州市', # 所属市
'line': line, 'county': '上城区', # 所属县
'social_code': id_code, 'registerDepartment': ' 浙江省市场监督管理局', # 登记机关
'com_name': id_code, 'scale': '', # 人员规模
'website':'' 'insured': '2425', # 参保人数
'beforeName': '浙江省长城建设集团有限公司', # 曾用名
'englishName': '', # 英文名
'importExportEnterpriseCode': '', # 进出口企业代码
'address': '浙江省杭州市上城区雷霆路60号长城大厦', # 地址
'businessRange': "承包通用工业与民用建设项目的建筑施工,市政工程、消防工程、防水防腐工程施工,特种建筑技术开发、施工,建筑装饰装璜,水、暧、风、电设备安装,锅炉安装,电梯维修和安装(以上凭资质证书经营)。净化工程、地基基础、建筑幕墙施工;彩钢板安装。建筑、装饰材料、金属材料、木制品、五金、照明电器、电线电缆的销售;木制品加工;汽车维修;仓储;自有建筑设备、钢模租赁;建筑技术咨询。开展对外经济技术合作业务;承包境外工业与民用建筑工程及境内国际招标工程;上述境外工程所需的设备、材料出口;对外派遣实施上述境外工程所需的劳务人员(凭外经贸部门批准文件)。", # 经营范围
'status': 0, # 状态
} }
company_id = find_id_by_name(id_code) try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
if company_id == "": kafka_result = producer.send("regionInfo", json.dumps(aa_dict, ensure_ascii=False).encode('utf8'))
print(com_name + ":企业ID获取失败") print(kafka_result.get(timeout=10))
list_weicha.append(com_name + ":企业ID获取失败") except:
continue exception = 'kafka传输失败'
WebSite = info_by_id(company_id,com_name)
dic_com['website'] = WebSite
log.info(f'---{num_df}-------{com_name}----------耗时{baseCore.getTimeCost(start,time.time())}')
list_all_info.append(dic_com)
baseCore.writerToExcel(list_all_info,filename)
...@@ -657,7 +657,9 @@ class BaseCore: ...@@ -657,7 +657,9 @@ class BaseCore:
def secrchATT(self, item_id, year, type_id,path): def secrchATT(self, item_id, year, type_id,path):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s and path=%s''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s and path=%s'''
self.cursor_.execute(sel_sql, (item_id, year, type_id,path)) self.cursor_.execute(sel_sql, (item_id, year, type_id,path))
selects = self.cursor_.fetchone() select = self.cursor_.fetchall()
print(f'共找到{len(select)}条数据')
selects = select[-1]
return selects return selects
def deliteATT(self,id): def deliteATT(self,id):
delitesql = f"delete from clb_sys_attachment where id = '{id}' " delitesql = f"delete from clb_sys_attachment where id = '{id}' "
...@@ -676,6 +678,7 @@ class BaseCore: ...@@ -676,6 +678,7 @@ class BaseCore:
status = retData['status'] status = retData['status']
create_by = retData['create_by'] create_by = retData['create_by']
page_size = retData['page_size'] page_size = retData['page_size']
print(f'---入库时page_size={page_size}---')
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
# selects = self.secrchATT(item_id, year, type_id) # selects = self.secrchATT(item_id, year, type_id)
...@@ -691,7 +694,7 @@ class BaseCore: ...@@ -691,7 +694,7 @@ class BaseCore:
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size,path,'zzsn',create_time) create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn',pub_time)
self.cursor_.execute(Upsql, values) # 插入 self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交 self.cnx_.commit() # 提交
...@@ -699,6 +702,7 @@ class BaseCore: ...@@ -699,6 +702,7 @@ class BaseCore:
selects = self.secrchATT(item_id, year, type_id,path) selects = self.secrchATT(item_id, year, type_id,path)
id = selects[0] id = selects[0]
print(f'获取的id为{id}')
return id return id
# 更新企业的CIK # 更新企业的CIK
...@@ -755,7 +759,7 @@ class BaseCore: ...@@ -755,7 +759,7 @@ class BaseCore:
except: except:
time.sleep(3) time.sleep(3)
continue continue
page_size = 0
name = name_pdf + '.pdf' name = name_pdf + '.pdf'
now_time = time.strftime("%Y-%m") now_time = time.strftime("%Y-%m")
try: try:
......
"""
根据排名 找到信用代码
"""
import os
import re
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
file_path = 'D:\\BaiduNetdiskDownload'
file_list = os.listdir(file_path)
# print(pdf_list)
num = 1
for file in file_list:
log.info(f'-----------当前文件{file}---------------')
if '.zip' not in file:
pass
else:
continue
pdf_list = os.listdir(file_path + '\\' + file)
log.info(f'当前文件夹有{len(pdf_list)}个文件')
for pdf in pdf_list:
# print(pdf)
#获取rank
try:
id = pdf.split('-')[0]
ename = pdf.split('-')[2]
ename = re.split(r'\.pdf', ename, flags=re.IGNORECASE)[0]
selectsql = f"select * from rankandcode where id = {id} and ename='{ename}'"
cursor.execute(selectsql)
result = cursor.fetchone()
if result:
pass
# print(f'信用代码为:{result[1]}')
else:
print(f'错误匹配:{id}----{ename}')
except:
log.info(f'文件夹--{file} pdf---{pdf}')
"""
读取文件 path = 'D:\kkwork\zzsn_spider\data\'
"""
import json
import os
import time
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
from retry import retry
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
import requests
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
pathType = 'QYYearReport/'
type_id = 1
create_by = 'XueLingKun'
taskType = '企业年报'
file_path = 'D:\\BaiduNetdiskDownload\\中国年报下载'
def sendKafka(dic_news):
start_time = time.time()
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("policy",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, dic_news['title'], 'Kafka操作失败')
log.info(dic_result)
return False
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
"""
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': file_size, 'status': 1, 'create_by': create_by,
'create_time': '', 'page_size': page_size, 'content': content}
"""
state = retData['state']
type_id = retData['type_id']
social_code = retData['item_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
create_time = retData['create_time']
page_size = retData['page_size']
content = retData['content']
retData_f = {'state': state, 'type_id': type_id, 'item_id': social_code, 'group_name': group_name, 'path': path,
'full_path': full_path,
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content}
try:
result = getOBSres(pathType, file_name, pdf_path)
except:
log = baseCore.getLogger()
log.error(f'OBS发送失败')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData_f['full_path'] = unquote(result['body']['objectUrl'])
retData_f['create_time'] = time_now
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_path, f'{e}')
return retData_f
return retData_f
@retry(tries=3, delay=1)
def getOBSres(pathType, name, response):
# result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
result = obsClient.putFile('zzsn', pathType+name, file_path=response)
return result
if __name__=='__main__':
log.info(f'-----------当前文件{file_path}---------------')
file_list = os.listdir(file_path)
# print(pdf_list)
num = 1
for file in file_list:
start_time = time.time()
pdf_path = file_path + '/'+file
file_rank = int(file.split('-')[0])
file_year = file.split('-')[1]
#file_rank 对应上企业信用代码
selectsql = f"select * from rankandcode where id = {file_rank}"
cursor.execute(selectsql)
data = cursor.fetchone()
cnx.commit()
social_code = data[1]
ename = data[2]
cname = data[3]
file_name = ename + ':' + file_year + '年年度报告' + '.pdf'
content = ''
#解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file:
byte_stream = file.read()
# print(byte_stream)
try:
with fitz.open(stream=byte_stream, filetype='pdf') as doc:
# page_size = doc.pageCount
page_size = doc.page_count
print(page_size)
for page in doc.pages():
content += page.get_text()
# print(content)
except Exception as e:
log.info(f'文件已损坏:{cname}')
continue
#解析文件大小
file_size = os.path.getsize(pdf_path)
file_size = baseCore.convert_size(file_size)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': file_size, 'status': 1, 'create_by': create_by,
'create_time': time_now, 'page_size': page_size, 'content': content}
#文件上传至文件服务器
try:
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path)
if retData_f['state']:
#retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,file_year,file_name, num,file_year+'-12-31')
if att_id:
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '企业官网',
'publishDate': file_year + '-12-31',
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': file_name,
'type': 1,
'socialCreditCode': social_code,
'year': file_year
}
if sendKafka(dic_news):
log.info(f'成功-{file_rank}--{file_name}----{att_id}---{social_code}')
num += 1
else:
log.info(f'失败-{file_rank}--{file_name}----{att_id}---{social_code}')
# 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{file_name}-----{social_code}')
except Exception as e:
log.info(f'error------{e}')
\ No newline at end of file
...@@ -104,8 +104,7 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url): ...@@ -104,8 +104,7 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url):
# headers['User-Agent'] = baseCore.getRandomUserAgent() # headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
response = requests.get(pdf_url, headers=header, verify=False, timeout=30) response = requests.get(pdf_url, headers=header, timeout=30)
file_size = int(response.headers.get('Content-Length'))
break break
except Exception as e: except Exception as e:
time.sleep(3) time.sleep(3)
...@@ -119,12 +118,18 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url): ...@@ -119,12 +118,18 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url):
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}---OBS发送失败') baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}---OBS发送失败')
return retData return retData
page_size = 0
with fitz.open(stream=response.content, filetype='pdf') as doc: with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count page_size = doc.page_count
log = baseCore.getLogger()
log.info(f'当前页码----{page_size}')
for page in doc.pages(): for page in doc.pages():
retData_f['content'] += page.get_text() retData_f['content'] += page.get_text()
try:
req = requests.head(pdf_url)
file_size = int(req.headers.get('Content-Length',0))
except:
file_size = 0
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
# print(f'======pdf解析失败=====') # print(f'======pdf解析失败=====')
...@@ -156,7 +161,7 @@ if __name__=='__main__': ...@@ -156,7 +161,7 @@ if __name__=='__main__':
start_time = time.time() start_time = time.time()
id = baseCore.redicPullData('fbspdfinfo:id') id = baseCore.redicPullData('fbspdfinfo:id')
# id = 514 # id = 537
selectsql = f"select * from fbspdfinfo where id={id}" selectsql = f"select * from fbspdfinfo where id={id}"
cursor.execute(selectsql) cursor.execute(selectsql)
data = cursor.fetchone() data = cursor.fetchone()
...@@ -180,8 +185,10 @@ if __name__=='__main__': ...@@ -180,8 +185,10 @@ if __name__=='__main__':
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url) retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url)
if retData_f['state']: if retData_f['state']:
content = retData_f['content'] content = retData_f['content']
page_size = retData_f['page_size']
log.info(f'当前页数为{page_size}')
#retData, com_name, year, pdf_name, num, pub_time #retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,year,file_name, num,time_now) att_id= baseCore.tableUpdate(retData_f, cname,year,file_name, num,year + '-12-31')
if att_id: if att_id:
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
...@@ -194,7 +201,7 @@ if __name__=='__main__': ...@@ -194,7 +201,7 @@ if __name__=='__main__':
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': 'zh',
'origin': '企业官网', 'origin': '企业官网',
'publishDate': time_now, 'publishDate': year + '-12-31',
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接 'sourceAddress': pdf_url, # 原文链接
'summary': '', 'summary': '',
......
...@@ -91,13 +91,16 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url): ...@@ -91,13 +91,16 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url):
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=30) response = requests.get(pdf_url, headers=headers,timeout=30)
file_size = int(response.headers.get('Content-Length'))
break break
except Exception as e: except Exception as e:
time.sleep(3) time.sleep(3)
continue continue
try: try:
file_size = int(response.headers.get('Content-Length'))
except:
file_size = 0
try:
result = getOBSres(pathType, file_name, response) result = getOBSres(pathType, file_name, response)
except: except:
log = baseCore.getLogger() log = baseCore.getLogger()
...@@ -159,7 +162,7 @@ if __name__=='__main__': ...@@ -159,7 +162,7 @@ if __name__=='__main__':
cname = data[3] cname = data[3]
year = data[4] year = data[4]
pdf_url = data[5] pdf_url = data[5]
file_name = ename + year +'年年度报告' +'.pdf' file_name = ename + ':' + year + '年年度报告' + '.pdf'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'开始处理{ename}---{social_code}') log.info(f'开始处理{ename}---{social_code}')
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
...@@ -173,7 +176,7 @@ if __name__=='__main__': ...@@ -173,7 +176,7 @@ if __name__=='__main__':
if retData_f['state']: if retData_f['state']:
content = retData_f['content'] content = retData_f['content']
#retData, com_name, year, pdf_name, num, pub_time #retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,year,file_name, num,time_now) att_id= baseCore.tableUpdate(retData_f, cname,year,file_name, num,year + '-12-31')
if att_id: if att_id:
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
...@@ -186,7 +189,7 @@ if __name__=='__main__': ...@@ -186,7 +189,7 @@ if __name__=='__main__':
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': 'zh',
'origin': '企业官网', 'origin': '企业官网',
'publishDate': time_now, 'publishDate': year + '-12-31',
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接 'sourceAddress': pdf_url, # 原文链接
'summary': '', 'summary': '',
......
...@@ -22,7 +22,7 @@ pathType = 'QYYearReport/' ...@@ -22,7 +22,7 @@ pathType = 'QYYearReport/'
type_id = 1 type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
file_path = 'D:/kkwork/zzsn_spider/data/1_福布斯2000_PDF_50_郭' file_path = 'D:/kkwork/zzsn_spider/data/1_福布斯2000_PDF_28_白'
file_list = os.listdir(file_path) file_list = os.listdir(file_path)
# print(pdf_list) # print(pdf_list)
for file in file_list: for file in file_list:
...@@ -50,7 +50,7 @@ for file in file_list: ...@@ -50,7 +50,7 @@ for file in file_list:
with fitz.open(stream=byte_stream, filetype='pdf') as doc: with fitz.open(stream=byte_stream, filetype='pdf') as doc:
# page_size = doc.pageCount # page_size = doc.pageCount
page_size = doc.page_count page_size = doc.page_count
print(page_size) # print(page_size)
for page in doc.pages(): for page in doc.pages():
content += page.get_text() content += page.get_text()
# print(content) # print(content)
...@@ -60,20 +60,21 @@ for file in file_list: ...@@ -60,20 +60,21 @@ for file in file_list:
#解析文件大小 #解析文件大小
file_size = os.path.getsize(pdf_path) file_size = os.path.getsize(pdf_path)
file_size = baseCore.convert_size(file_size) file_size = baseCore.convert_size(file_size)
# selectssql = f"select file_size from clb_sys_attachment where file_size='48.00 KB' and name='{file_name}' and item_id = '{social_code}'and create_time>='2023-10-14' "
# selectssql = f"select file_size from clb_sys_attachment where file_size='28.00 KB' and name='{file_name}' and item_id = '{social_code}'and create_time>='2023-10-14' "
# cursor_.execute(selectssql) # cursor_.execute(selectssql)
# result = cursor_.fetchone() # result = cursor_.fetchone()
# if result: # if result:
# if result[0]=='48.00 KB': # if result[0]=='28.00 KB':
# pass # pass
# else: # else:
# continue # continue
# else: # else:
# continue # continue
#
if social_code=='ZZSN230824151210788': # if social_code=='ZZSN230824151210788':
continue # continue
updatesql = f"update clb_sys_attachment set file_size='{file_size}' where file_size = '48.00 KB' and name='{file_name}' and item_id = '{social_code}'and create_time>='2023-10-14' " updatesql = f"update clb_sys_attachment set file_size='{file_size}' where name='{file_name}' and item_id = '{social_code}'and create_time>='2023-10-14' "
cursor_.execute(updatesql) cursor_.execute(updatesql)
cnx_.commit() cnx_.commit()
print('更新成功') print(f'更新成功---{file_size}----{file_name}---{social_code}')
\ No newline at end of file \ No newline at end of file
...@@ -97,7 +97,7 @@ def spider(com_name,cik,up_okCount): ...@@ -97,7 +97,7 @@ def spider(com_name,cik,up_okCount):
#正式 #正式
url_json = f'https://data.sec.gov/submissions/CIK{cik}.json' url_json = f'https://data.sec.gov/submissions/CIK{cik}.json'
#测试 #测试
# url_json = 'https://data.sec.gov/submissions/CIK0001395064.json' # url_json = 'https://data.sec.gov/submissions/CIK0000104169.json'
#解析页面 #解析页面
for nnn in range(0,4): for nnn in range(0,4):
...@@ -105,7 +105,7 @@ def spider(com_name,cik,up_okCount): ...@@ -105,7 +105,7 @@ def spider(com_name,cik,up_okCount):
# req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30) # req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30)
req = requests.get(url=url_json, headers=header, verify=False, timeout=30) req = requests.get(url=url_json, headers=header, verify=False, timeout=30)
break break
except: except Exception as e:
time.sleep(2) time.sleep(2)
continue continue
try: try:
...@@ -316,7 +316,7 @@ if __name__ == '__main__': ...@@ -316,7 +316,7 @@ if __name__ == '__main__':
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# cik = baseCore.redicPullData('Sec_cik_US:uscik_annualReport') # cik = baseCore.redicPullData('Sec_cik_US:uscik_annualReport')
cik = '320193' cik = '789019'
data = fromcikgetinfo(cik) data = fromcikgetinfo(cik)
com_name = data[2] com_name = data[2]
com_code = data[3] com_code = data[3]
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论