提交 5c05e843 作者: 薛凌堃

10/24

上级 cdc4a715
...@@ -23,6 +23,7 @@ from DBUtils.PooledDB import PooledDB ...@@ -23,6 +23,7 @@ from DBUtils.PooledDB import PooledDB
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
import uuid
tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf') tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
...@@ -682,12 +683,13 @@ class BaseCore: ...@@ -682,12 +683,13 @@ class BaseCore:
id = '' id = ''
return id return id
else: else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
pub_time, page_size) create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time)
self.cursor_.execute(Upsql, values) # 插入 self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交 self.cnx_.commit() # 提交
...@@ -735,6 +737,12 @@ class BaseCore: ...@@ -735,6 +737,12 @@ class BaseCore:
else: else:
self.getLogger().info(f'=====文件存在obs========{file_path}') self.getLogger().info(f'=====文件存在obs========{file_path}')
#uuid 根据时间戳生成 文件名 上传到obs
def getuuid(self):
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time,create_by): def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time,create_by):
headers = {} headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
...@@ -751,7 +759,7 @@ class BaseCore: ...@@ -751,7 +759,7 @@ class BaseCore:
time.sleep(3) time.sleep(3)
continue continue
page_size = 0 page_size = 0
name = name_pdf name = str(self.getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m") now_time = time.strftime("%Y-%m")
try: try:
result = self.getOBSres(pathType, now_time, name, response) result = self.getOBSres(pathType, now_time, name, response)
...@@ -773,8 +781,8 @@ class BaseCore: ...@@ -773,8 +781,8 @@ class BaseCore:
try: try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1]) retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl']) retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = self.convert_size(file_size) retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now retData['create_time'] = time_now
retData['page_size'] = page_size retData['page_size'] = page_size
...@@ -788,6 +796,6 @@ class BaseCore: ...@@ -788,6 +796,6 @@ class BaseCore:
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def getOBSres(self, pathType, now_time, name, response): def getOBSres(self, pathType, now_time, name, response):
result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content) result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# resp = obsClient.putFile('zzsn', f'{pathType}{now_time}/' + name, file_path='要上传的那个文件的本地路径') # resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result return result
...@@ -71,7 +71,7 @@ def NewsEnterprise(): ...@@ -71,7 +71,7 @@ def NewsEnterprise():
#将数据插入到redis中 #将数据插入到redis中
for item in gn_social_list: for item in gn_social_list:
r.rpush('NewsEnterprise:gnqy_socialCode', item) r.rpush('NewsEnterprise:gnqy_socialCode', item)
# r.rpush('NewsEnterprise:gnqybuchong_socialCode', item) # r.rpush('NewsEnterprise:gnqybc_socialCode', item)
# for item in gw_social_list: # for item in gw_social_list:
# r.rpush('NewsEnterprise:gwqy_socialCode', item) # r.rpush('NewsEnterprise:gwqy_socialCode', item)
...@@ -256,7 +256,7 @@ def AnnualEnterprise_task(): ...@@ -256,7 +256,7 @@ def AnnualEnterprise_task():
def AnnualEnterpriseXueQ(): def AnnualEnterpriseXueQ():
cnx,cursor = connectSql() cnx,cursor = connectSql()
# 获取国内上市企业 # 获取国内上市企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1 limit 10" gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1"
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
cnx.commit() cnx.commit()
...@@ -518,6 +518,19 @@ def fbspdfurlinfo(): ...@@ -518,6 +518,19 @@ def fbspdfurlinfo():
for item in com_namelist: for item in com_namelist:
r.rpush('ompdfurlinfo:id', item) r.rpush('ompdfurlinfo:id', item)
def dujs_1020():
cnx, cursor = connectSql()
query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=7 AND a.Place=1"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('dujs_1020:baseinfo_socialcode', item)
# r.rpush('dujs_1020:news_socialcode', item)
# r.rpush('dujs_1020:person_socialcode', item)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
# fbspdfurlinfo() # fbspdfurlinfo()
...@@ -530,13 +543,15 @@ if __name__ == "__main__": ...@@ -530,13 +543,15 @@ if __name__ == "__main__":
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() # NewsEnterprise_task()
# NewsEnterprise() NewsEnterprise()
# AnnualEnterpriseXueQ()
# dujs_1020()
# dujioashou() # dujioashou()
# BaseInfoEnterprise() # BaseInfoEnterprise()
# FBS() # FBS()
# MengZhi() # MengZhi()
# NQEnterprise() # NQEnterprise()
SEC_CIK() # SEC_CIK()
# dujioashou() # dujioashou()
# omeng() # omeng()
# AnnualEnterpriseUS() # AnnualEnterpriseUS()
......
import json
from fdfs_client.client import get_tracker_conf, Fdfs_client from kafka import KafkaProducer
from bs4 import BeautifulSoup import requests, time, fitz
import requests, re, time, pymysql, fitz
import urllib3 import urllib3
from base import BaseCore from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx_
cursor = baseCore.cursor cursor_ = cnx_.cursor()
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会'
# def get_proxy():
# cursor = cnx_ip.cursor()
# sql = "select proxy from clb_proxy"
# cursor.execute(sql)
# proxy_lists = cursor.fetchall()
# ip_list = []
# for proxy_ in proxy_lists:
# ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
# proxy_list = []
# for str_ip in ip_list:
# str_ip_list = str_ip.split('-')
# proxyMeta = "http://%(host)s:%(port)s" % {
# "host": str_ip_list[0],
# "port": str_ip_list[1],
# }
# proxy = {
# "HTTP": proxyMeta,
# "HTTPS": proxyMeta
# }
# proxy_list.append(proxy)
# return proxy_list
def RequestUrl(url, payload, item_id, start_time):
# ip = get_proxy()[random.randint(0, 3)]
response = requests.post(url=url, headers=headers, data=payload) # ,proxies=ip)
response.encoding = response.apparent_encoding
# 检查响应状态码
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
soup = BeautifulSoup(response.text, 'html.parser')
pass
else:
# 请求失败,输出错误信息
print('请求失败:', response.status_code, response.text)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, url, '请求失败')
soup = ''
return soup
def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, create_time, page_size):
sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s'''
cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone()
if selects:
print(f'{name_pdf},{year}已存在')
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by,
create_time, page_size)
cursor_.execute(Upsql, values) # 插入
cnx.commit() # 提交
print("更新完成:{}".format(Upsql))
# 采集信息
def SpiderByZJH(url, payload, dic_info, num, start_time):
item_id = dic_info[2]
# years = dic_info['call_year']
short_name = dic_info[4]
soup = RequestUrl(url, payload, item_id, start_time)
if soup == '':
return False
# 先获取页数
page = 0
try:
page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
except:
e = f"该企业没有{dic_parms['Catagory2']}数据"
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, dic_parms['url'], f'{e}')
return False
if page != 0:
total = re.findall(r'\d+', page)[0]
r_page = int(total) % 15
if r_page == 0:
Maxpage = int(total) // 15
else:
Maxpage = int(total) // 15 + 1
# 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1, Maxpage + 1):
if i == 1:
href = url
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}_f.html'
soup = RequestUrl(href, payload, item_id, start_time)
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
for tr in tr_list[1:]:
td_list = tr.find_all('td')
pdf_url_info = td_list[2]
# print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
# print(name)
report_type = td_list[4].text.strip()
# print(report_type)
if report_type == '年报':
if '摘要' in name_pdf:
continue
# 年份还从pdf名称里抽取
try:
year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '')
except Exception as e:
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
year = int(pub_time) - 1
year = str(year)
page_size = 0
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s'''
cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone()
if selects:
print(f'com_name:{short_name}、{year}已存在')
continue
else:
# 类型为年报的话就解析该年报pdf,并入库
for i in range(0, 3):
try:
resp_content = requests.request("GET", pdf_url).content
# 获取pdf页数
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
break
except Exception as e:
print(e)
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
print(f'==={short_name}、{year}===pdf解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
continue
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
print(e)
time.sleep(3)
continue
if result == '':
e = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1' cnx = baseCore.cnx
cursor = baseCore.cursor
item_id = item_id
group_name = 'group1' def sendKafka(dic_news):
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
path = bytes.decode(result['Remote file_id']).replace('group1', '') print(kafka_result.get(timeout=10))
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
page_size = page_size
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
category, file_size, order_by, status, create_by, create_time, page_size)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
except:
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
num = num + 1
time.sleep(2)
else:
e = '采集失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
else:
continue
return True
def getUrl(code, url_parms, Catagory2_parms): dic_result = {
# 深市 'success': 'ture',
if code[0] == '2' or code[0] == '0' or code[0] == '3': 'message': '操作成功',
url = f'http://eid.csrc.gov.cn/{url_parms[1]}/index_f.html' 'code': '200',
Catagory2 = Catagory2_parms[1]
# 构建POST请求的参数,prodType --- 股票代码
payload2 = {
'prodType': f'{code}',
'prodType2': '代码/简称/拼音缩写 ',
'keyWord': '',
'keyWord2': '关键字',
'startDate': '',
'startDate2': '请输入开始时间',
'endDate': '',
'endDate2': '请输入结束时间',
'selCatagory2': f'{Catagory2}',
'selBoardCode0': '',
'selBoardCode': ''
}
dic_parms = {
'code': code,
'url': url,
'Catagory2': Catagory2,
'payload': payload2
}
# 沪市
if code[0] == '9' or code[0] == '6':
url = f'http://eid.csrc.gov.cn/{url_parms[0]}/index_f.html'
Catagory2 = Catagory2_parms[0]
payload1 = {
'prodType': f'{code}',
'prodType2': '代码/简称/拼音缩写 ',
'keyWord': '',
'keyWord2': '关键字',
'startDate': '',
'startDate2': '请输入开始时间',
'endDate': '',
'endDate2': '请输入结束时间',
'selCatagory2': f'{Catagory2}',
'selCatagory3': '',
'selBoardCode0': '',
'selBoardCode': '',
}
dic_parms = {
'code': code,
'url': url,
'Catagory2': Catagory2,
'payload': payload1
} }
log.info(dic_result)
# 北交所 return True
if code[0] == '8' or code[0] == '4': except Exception as e:
try: dic_result = {
url = f'http://eid.csrc.gov.cn/{url_parms[2]}/index_f.html' 'success': 'false',
except: 'message': '操作失败',
return 'code': '204',
Catagory2 = Catagory2_parms[2] 'e': e
payload3 = {
'prodType': f'{code}',
'prodType2': '代码/简称/拼音缩写 ',
'keyWord': '',
'keyWord2': '关键字',
'startDate': '',
'startDate2': '请输入开始时间',
'endDate': '',
'endDate2': '请输入结束时间',
'selCatagory2': f'{Catagory2}'
}
dic_parms = {
'code': code,
'url': url,
'Catagory2': Catagory2,
'payload': payload3
} }
return dic_parms log.info(dic_result)
return False
#state1 #state1
if __name__ == '__main__': if __name__ == '__main__':
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Length': '380',
'Content-Type': 'application/x-www-form-urlencoded',
'Cookie': 'acw_tc=01c6049e16908026442931294e4d0b65d95e3ba93ac19993d151844ac6',
'Host': 'eid.csrc.gov.cn',
'Origin': 'http://eid.csrc.gov.cn',
'Pragma': 'no-cache',
'Referer': 'http://eid.csrc.gov.cn/101111/index_1_f.html',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
}
header = { header = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
'Cookie': 'ba17301551dcbaf9_gdp_user_key=; gdp_user_id=gioenc-4c21c93a%2Ccdgd%2C5c8b%2Cc32e%2C8g0229546a17; ba17301551dcbaf9_gdp_session_id_dc777856-a24e-4008-a8a6-af88d75bae2b=true; ba17301551dcbaf9_gdp_sequence_ids={%22globalKey%22:3%2C%22VISIT%22:2%2C%22PAGE%22:2}; acw_tc=71dbb29c16908906086793104e8117f44af84d756f68927c202e9a70b1', 'Cache-Control': 'max-age=0',
'Host': 'static.sse.com.cn', 'sec-ch-ua': '"Chromium";v="112", "Microsoft Edge";v="112", "Not:A-Brand";v="99"',
'Pragma': 'no-cache', 'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1', 'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
} }
query = "select * from clb_sys_attachment where id= 383022"
# 读取数据库获取股票代码 简称 以及 社会信用代码 cursor_.execute(query)
num = 1 results = cursor_.fetchall()
while True: for result in results:
start_time = time.time() att_id = result[0]
# 获取企业信息 name = result[1]
# social_code = baseCore.redicPullData('AnnualEnterprise:gnqy_socialCode')
# if social_code == '': social_code = result[3]
# time.sleep(20) selectcom = f"select * from EnterpriseInfo where SocialCode = '{social_code}'"
# continue cursor.execute(selectcom)
selects = cursor.fetchone()
# 获取企业信息 com_name = selects[4]
query = "SELECT * FROM Tfbs_bak where col3 is not null and length(col3)>3 and col3 not like 'ZZSN%' and state1='1' limit 1 " if com_name:
# 兴业银行
# query = "SELECT * FROM Tfbs_bak where col3 is not null and length(col3)>3 and col3 not like 'ZZSN%' and col5='通威股份'"
cursor.execute(query)
row = cursor.fetchone()
if row:
pass pass
else: else:
print('没有数据了,结束脚本') com_name = selects[1]
break full_path = 'http://114.115.215.96/' + result[6]
year = result[9]
# tycid = row[14] create_time = result[13]
com_name = row[6] content = ''
social_code = row[4] for i in range(0, 3):
code = row[7] try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) response = requests.get(url=full_path, headers=header, timeout=30)
#1表示拿到数据 break
updateBeginSql = f"update Tfbs_bak set state1='0' and date1='{time_now}' where col3='{social_code}' " except Exception as e:
cursor.execute(updateBeginSql) time.sleep(3)
cnx.commit() continue
dic_info = baseCore.getInfomation(social_code) with fitz.open(stream=response.content, filetype='pdf') as doc:
# count = dic_info[15] page_size = doc.page_count
# 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html log = baseCore.getLogger()
# url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html log.info(f'当前页码----{page_size}')
url_parms = ['101111', '101811', '102611'] for page in doc.pages():
Catagory2_parms = ['9604', '10058', '10162'] content += page.get_text()
# 根据股票代码选链接 detect_language = baseCore.detect_language(content)
# 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,4、8开头的为北京交易所 dic_news = {
try: 'attachmentIds': att_id,
code = dic_info[3] 'author': '',
except Exception as e: 'content': content,
print(e,social_code) 'contentWithTag': '',
continue 'createDate': str(create_time),
dic_parms = getUrl(code, url_parms, Catagory2_parms) 'deleteFlag': '0',
'id': '',
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, num, start_time) 'keyWords': '',
end_time = time.time() 'lang': detect_language,
print(f'{com_name} ---- 该企业耗时 ---- {end_time - start_time}') 'origin': com_name + '企业官网',
# count += 1 'publishDate': str(year) + '-12-31',
runType = 'AnnualReportCount' 'sid': '1684032033495392257',
# baseCore.updateRun(social_code, runType, count) 'sourceAddress': '', # 原文链接
'summary': '',
'title': name.replace('.pdf',''),
'type': 1,
'socialCreditCode': social_code,
'year': year
}
if sendKafka(dic_news):
# 100表示成功
log.info(f'==========={social_code}成功============')
cnx.close() cnx.close()
cursor_.close() cursor_.close()
......
import json import json
...@@ -316,7 +316,8 @@ if __name__ == '__main__': ...@@ -316,7 +316,8 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnqy_socialCode') # social_code = baseCore.redicPullData('AnnualEnterprise:gnqy_socialCode')
social_code = '91100000100003962T'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -16,8 +16,22 @@ import requests, re, time, pymysql, fitz ...@@ -16,8 +16,22 @@ import requests, re, time, pymysql, fitz
from bs4 import BeautifulSoup as bs from bs4 import BeautifulSoup as bs
from selenium import webdriver from selenium import webdriver
chromedriver = "D:/chrome/chromedriver.exe" # chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver) # browser = webdriver.Chrome(chromedriver)
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'D:/Google/Chrome/Application/chrome.exe'
chromedriver = r'D:/cmd100/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
log = baseCore.getLogger() log = baseCore.getLogger()
requests.adapters.DEFAULT_RETRIES = 3 requests.adapters.DEFAULT_RETRIES = 3
...@@ -73,7 +87,8 @@ def spider_annual_report(dict_info,num): ...@@ -73,7 +87,8 @@ def spider_annual_report(dict_info,num):
for i in list_all: for i in list_all:
# ip = get_proxy()[random.randint(0, 3)] # ip = get_proxy()[random.randint(0, 3)]
pdf_name_a = i.text pdf_name_a = i.text
if 'H股公告' in pdf_name_a:
continue
year_url = 'https://vip.stock.finance.sina.com.cn' + i.get('href') year_url = 'https://vip.stock.finance.sina.com.cn' + i.get('href')
year_name = i.text year_name = i.text
browser.get(year_url) browser.get(year_url)
...@@ -93,7 +108,7 @@ def spider_annual_report(dict_info,num): ...@@ -93,7 +108,7 @@ def spider_annual_report(dict_info,num):
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception) baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue continue
#公告日期 #公告日期
pub_time = soup_2.find('td',{'class':'head'}).text.split('公告日期')[1] pub_time = soup_2.find('td',{'class':'head'}).text.split('公告日期:')[1]
try: try:
# 标题中有年份, # 标题中有年份,
...@@ -169,12 +184,12 @@ def spider_annual_report(dict_info,num): ...@@ -169,12 +184,12 @@ def spider_annual_report(dict_info,num):
state = 1 state = 1
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '成功') baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '成功')
except: except Exception as e:
exception = '数据库传输失败' exception = '数据库传输失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception) baseCore.recordLog(social_code, taskType, state, takeTime, year_url, f'{exception} - --{e}')
return False
#发送数据到kafka #发送数据到kafka
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
...@@ -210,7 +225,7 @@ def spider_annual_report(dict_info,num): ...@@ -210,7 +225,7 @@ def spider_annual_report(dict_info,num):
'message': '操作成功', 'message': '操作成功',
'code': '200', 'code': '200',
} }
print(dic_result) log.info(dic_result)
# return True # return True
except Exception as e: except Exception as e:
dic_result = { dic_result = {
...@@ -222,7 +237,7 @@ def spider_annual_report(dict_info,num): ...@@ -222,7 +237,7 @@ def spider_annual_report(dict_info,num):
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败') baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
print(dic_result) log.info(dic_result)
return False return False
# num = num + 1 # num = num + 1
...@@ -240,7 +255,7 @@ if __name__ == '__main__': ...@@ -240,7 +255,7 @@ if __name__ == '__main__':
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode') # social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '9133060072360502XQ' social_code = '91100000100003962T'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -272,11 +287,12 @@ if __name__ == '__main__': ...@@ -272,11 +287,12 @@ if __name__ == '__main__':
'code':code, 'code':code,
} }
# list_info.append(dict_info) # list_info.append(dict_info)
spider_annual_report(dict_info,num) if spider_annual_report(dict_info,num):
count += 1 count += 1
runType = 'AnnualReportCount' runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
break
# cursor.close() # cursor.close()
cnx_.close() cnx_.close()
# 释放资源 # 释放资源
......
...@@ -4,6 +4,7 @@ import random ...@@ -4,6 +4,7 @@ import random
import socket import socket
import sys import sys
import time import time
import uuid
import logbook import logbook
import logbook.more import logbook.more
...@@ -742,6 +743,12 @@ class BaseCore: ...@@ -742,6 +743,12 @@ class BaseCore:
else: else:
self.getLogger().info(f'=====文件存在obs========{file_path}') self.getLogger().info(f'=====文件存在obs========{file_path}')
import uuid
def getuuid(self):
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time,create_by,file_path): def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time,create_by,file_path):
headers = {} headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
...@@ -758,7 +765,7 @@ class BaseCore: ...@@ -758,7 +765,7 @@ class BaseCore:
time.sleep(3) time.sleep(3)
continue continue
name = name_pdf + '.pdf' name = str(self.getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m") now_time = time.strftime("%Y-%m")
try: try:
result = self.getOBSres(pathType, now_time, name, file_path) result = self.getOBSres(pathType, now_time, name, file_path)
......
...@@ -9,7 +9,7 @@ from kafka import KafkaProducer ...@@ -9,7 +9,7 @@ from kafka import KafkaProducer
from obs import ObsClient from obs import ObsClient
import fitz import fitz
from urllib.parse import unquote from urllib.parse import unquote
import uuid
from retry import retry from retry import retry
obsClient = ObsClient( obsClient = ObsClient(
...@@ -29,7 +29,7 @@ type_id = 1 ...@@ -29,7 +29,7 @@ type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网1_福布斯2000_PDF_60_付 #付俊雪的需要改为巨潮资讯网1_福布斯2000_PDF_60_付
file_path = 'D:\\年报\\失败' file_path = 'D:\\年报\\欧盟记分牌2500_年报补充_87_20231020'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -66,6 +66,11 @@ def sendKafka(dic_news): ...@@ -66,6 +66,11 @@ def sendKafka(dic_news):
log.info(dic_result) log.info(dic_result)
return False return False
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
""" """
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
...@@ -93,7 +98,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): ...@@ -93,7 +98,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
'create_time': create_time, 'page_size': page_size, 'content': content} 'create_time': create_time, 'page_size': page_size, 'content': content}
try: try:
result = getOBSres(pathType, file_name, pdf_path) name = str(getuuid()) + '.pdf'
result = getOBSres(pathType, name, pdf_path)
except: except:
log = baseCore.getLogger() log = baseCore.getLogger()
log.error(f'OBS发送失败') log.error(f'OBS发送失败')
...@@ -101,8 +107,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): ...@@ -101,8 +107,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
try: try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1]) retData_f['path'] = result['body']['objectUrl'].split('.com')[1]
retData_f['full_path'] = unquote(result['body']['objectUrl']) retData_f['full_path'] = result['body']['objectUrl']
retData_f['create_time'] = time_now retData_f['create_time'] = time_now
except Exception as e: except Exception as e:
state = 0 state = 0
......
...@@ -51,7 +51,7 @@ if __name__=="__main__": ...@@ -51,7 +51,7 @@ if __name__=="__main__":
opt.add_experimental_option('excludeSwitches', ['enable-logging']) opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False) opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe' opt.binary_location = r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe' chromedriver = r'D:\cmd100\chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver) browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
......
...@@ -18,10 +18,81 @@ element.getparent() #获取给定元素的父元素 ...@@ -18,10 +18,81 @@ element.getparent() #获取给定元素的父元素
# aa = pd.read_csv(StringIO(data),escapechar='\r') # aa = pd.read_csv(StringIO(data),escapechar='\r')
# print(aa) # print(aa)
import pandas as pd # import pandas as pd
#
# # 读取txt文件
# data = pd.read_csv('D:\\美国证券交易委员会\\2022q4\\sub.txt', delimiter='\t') # 根据实际情况选择正确的分隔符
#
# # 将数据保存为csv文件
# data.to_csv('D:\\美国证券交易委员会\\2022q4\\sub.csv', index=False) # index=False表示不保存行索引
"""验证码识别测试"""
# import ddddocr
#
# ocr = ddddocr.DdddOcr()
#
# with open("D:\\kkwork\\captchaNew (3).jfif", 'rb') as f:
# image = f.read()
#
# res = ocr.classification(image)
# print(res)
"""测试中国执行信息公开网 模拟浏览器"""
import ddddocr
from PIL import Image
import re
import requests, time, random, json, pymysql, redis
import urllib3
from bs4 import BeautifulSoup
from selenium import webdriver
from obs import ObsClient
from kafka import KafkaProducer
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
def use_ocr(img):
ocr = ddddocr.DdddOcr()
with open(img, 'rb') as f:
image = f.read()
res = ocr.classification(image)
print(res)
return res
if __name__=="__main__":
requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'D:/Google/Chrome/Application/chrome.exe'
chromedriver = r'D:/cmd100/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "http://zxgk.court.gov.cn/shixin/"
browser.get(url)
# 可改动
time.sleep(20)
# 读取txt文件 screen_img_path = "D:/screen/xxx.png"
data = pd.read_csv('D:\\美国证券交易委员会\\2023q2\\pre.txt', delimiter='\t') # 根据实际情况选择正确的分隔符 out_img_path = "D:/out/xxx.png"
ele = driver.find_element(By.ID, 'XXXX')
# 将数据保存为csv文件 code = use_ocr(out_img_path)
data.to_csv('D:\\美国证券交易委员会\\2023q2\\pre.csv', index=False) # index=False表示不保存行索引 验证码输入框元素.send_keys(code)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论