提交 a78391e7 作者: 薛凌堃

11/20

上级 9d6175f9
......@@ -413,9 +413,13 @@ class BaseCore:
"host": str_ip_list[0],
"port": str_ip_list[1],
}
# proxy = {
# "HTTP": proxyMeta,
# "HTTPS": proxyMeta
# }
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
"http": proxyMeta,
"https": proxyMeta
}
proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)]
......
......@@ -564,13 +564,13 @@ if __name__ == "__main__":
# kegaishifan()
# shuangbaiqiye()
# zhuangjingtexind()
# NoticeEnterprise()
NoticeEnterprise()
# AnnualEnterpriseIPO()
# AnnualEnterprise()
# BaseInfoEnterprise()
# BaseInfoEnterpriseAbroad()
# NewsEnterprise_task()
NewsEnterprise()
# NewsEnterprise()
# CorPerson()
# china100()
# global100()
......
......@@ -584,7 +584,7 @@ if __name__ == '__main__':
continue
social_code = company_field.split('|')[0]
com_name = company_field.split('|')[2]
com_name = company_field.split('|')[2].replace(" ", "")
ynDomestic = company_field.split('|')[15]
countryName = company_field.split('|')[16]
......
......@@ -33,8 +33,7 @@ class EsMethod(object):
def queryatt(self,index_name,pnum):
body = {
"_source": ["attachmentIds", "sourceAddress", "title",
"publishDate", "createDate"],
"_source": ["attachmentIds", "sourceAddress", "title", "createDate"],
"query": {
"bool": {
"must": [
......@@ -64,7 +63,6 @@ class EsMethod(object):
'hits.hits._source.title',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.publishDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
......@@ -86,11 +84,11 @@ class EsMethod(object):
# 根据es附件id,去附件表中查询,未查询到的研报,根据title查询出附件id,更新es
def secrchATT(item_id, retData, type_id,order_by):
def secrchATT(type_id,id,title,attid):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
sel_sql = '''select id from clb_sys_attachment where type_id=%s and id = %s '''
lock.acquire()
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
cursor_.execute(sel_sql, (type_id, title,attid))
selects = cursor_.fetchone()
lock.release()
return selects
......@@ -113,8 +111,11 @@ if __name__ == "__main__":
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
attid = mms['_source']['attachmentIds'][0]
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
log.info(f'{id}-{attid}--{title}--{sourceAddress}---')
selects = secrchATT('4',id,title,attid)
if selects:
pass
else:
baseCore.r.lpush('YanBao:id', id)
\ No newline at end of file
......@@ -22,7 +22,7 @@ def delete(object_keys):
obs_client.close()
if __name__=='__main__':
query = "SELECT object_key FROM clb_sys_attachment WHERE type_id=8 AND source = '证监会' AND create_time >= '2023-10-30 16:46:09' AND create_time <= '2023-11-01 09:11:12'"
query = "SELECT object_key FROM clb_sys_attachment WHERE type_id=8 AND item_id = '914405007229411602' "
cursor_.execute(query)
results = cursor_.fetchall()
object_keys = [item[0] for item in results]
......
"""
"""
......@@ -77,6 +77,7 @@ class EsMethod(object):
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.labels.relationId',
# 'hits.hits._source.labels',
'hits.hits._source.publishDate',
'hits.hits._source.year',
'hits.hits._source.createDate',
......@@ -254,8 +255,12 @@ if __name__ == '__main__':
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
try:
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
except:
continue
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
......
"""
"""
......@@ -262,8 +262,12 @@ def main(page,p,esMethod):
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
try:
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
except:
continue
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
......
......@@ -4,6 +4,7 @@ import re
import time
import uuid
from datetime import datetime
import random
import requests
from bs4 import BeautifulSoup
......@@ -150,39 +151,19 @@ def tableUpdate(retData, com_name, year, pdf_name, num,pub_time,origin):
id = selects[0]
return id
@retry(tries=3, delay=5)
def RequestUrl(url, payload, social_code,start_time):
# ip = get_proxy()[random.randint(0, 3)]
for m in range(0, 3):
try:
response = requests.post(url=url, headers=headers, data=payload) # ,proxies=ip)
response.encoding = response.apparent_encoding
break
except Exception as e:
log.error(f"request请求异常----{m}-----{e}")
pass
ip = baseCore.get_proxy()
# proxy = {'https': 'http://127.0.0.1:8888', 'http': 'http://127.0.0.1:8888'}
response = requests.post(url=url, headers=headers, data=payload, proxies=ip)
response.encoding = response.apparent_encoding
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
return soup
else:
raise
# 检查响应状态码
try:
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
soup = BeautifulSoup(response.text, 'html.parser')
pass
else:
# 请求失败,输出错误信息
log.error('请求失败:', url)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
soup = ''
except:
log.error('请求失败:', url)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
soup = ''
return soup
def getUrl(code, url_parms, Catagory2_parms):
# 深市
......@@ -352,7 +333,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
......@@ -384,8 +365,15 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
if short_name == 'None':
short_name = dic_info[1]
com_name = dic_info[1]
soup = RequestUrl(url, payload, social_code, start_time)
try:
soup = RequestUrl(url, payload, social_code, start_time)
except:
# 请求失败,输出错误信息
log.error(f'请求失败:{url}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
soup = ''
if soup == '':
return
# 判断查找内容是否存在
......@@ -422,8 +410,15 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}_f.html'
soup = RequestUrl(href, payload, social_code, start_time)
try:
soup = RequestUrl(href, payload, social_code, start_time)
except:
# 请求失败,输出错误信息
log.error('请求失败:', url)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
soup = ''
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
......@@ -474,12 +469,13 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
insert = InsterInto(social_code,pdf_url,pub_time,name_pdf)
if insert:
log.info(f'===={social_code}========{name_pdf}=====插入库成功')
pass
else:
continue
else:
log.info(f'======={short_name}========{code}===已存在')
continue
# continue
break
if __name__ == '__main__':
num = 0
......@@ -519,7 +515,7 @@ if __name__ == '__main__':
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterprise:gnqy_socialCode')
social_code = '91110108740053589U'
social_code = '91370000163446410B'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......
......@@ -84,16 +84,17 @@ def get_info(dict_json):
updatewxLink(url_news,info_source_code,400)
return False
soup_news = BeautifulSoup(res_news.content, 'html.parser')
news_html = soup_news.find('div', {'id': 'js_content'})
news_html['style'] = 'width: 814px ; margin: 0 auto;'
try:
news_html = soup_news.find('div', {'id': 'js_content'})
news_html['style'] = 'width: 814px ; margin: 0 auto;'
#del news_html['style']
news_html=rm_style_attr(news_html)
del news_html['id']
del news_html['class']
except:
pass
log.error(f'{url_news}-----{info_source_code}')
return False
try:
news_content = news_html.text
except:
......@@ -244,6 +245,6 @@ if __name__=="__main__":
num_caiji = num_caiji + 1
log.info(f'-----已采集{num_caiji}篇文章---来源{dict_json["site_name"]}----')
else:
break
continue
baseCore.close()
\ No newline at end of file
......@@ -8,7 +8,7 @@ import requests
import urllib3
from pymysql.converters import escape_string
import sys
sys.path.append('D:\\zzsn_spider\\base')
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore =BaseCore.BaseCore()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论