提交 ad14b725 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

# -*- coding: utf-8 -*-
import sys
import pandas as pd
import requests
from goose3 import Goose
from goose3.text import StopWordsChinese, StopWordsKorean, StopWordsArabic
from base.smart.entity import *
from base.smart.smart_extractor_utility import SmartExtractorUtility
sys.path.append('D:\\kkwork\\zzsn_spider\\base\\smart')
from entity import *
from smart_extractor_utility import SmartExtractorUtility
# goose3自带的lxml,提示找不到etree,但仍可使用
from lxml import etree
from lxml.html import HtmlElement
......@@ -135,6 +138,16 @@ class SmartExtractor:
return self.get_extraction_result(article, link_text)
def extract_by_html(self, html, link_text=''):
"""
按HTML采集内容
"""
# 采集正文:传入html
article = self.goose.extract(raw_html=html)
return self.get_extraction_result(article, link_text)
#url_list = [["搜狐新闻",'https://news.tianyancha.com/ll_uc76l7d774.html?gid=1499023','430418'],.....]
def extract_by_url_test(url_list,list_info_all):
# 测试:按URL采集
......
# 根据信用代码获取天眼查id
import json
import random
import sys
import time
import pymysql
import requests
from base.BaseCore import BaseCore
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'Accept': 'application/json, text/plain, */*',
......
......@@ -6,11 +6,12 @@ import requests, time, pymysql
import jieba
import sys
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from getTycId import getTycIdByXYDM
# from base.BaseCore import BaseCore
# from base.smart import smart_extractor
sys.path.append('D:\\zzsn_spider\\base')
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
from smart import smart_extractor
import urllib3
......@@ -51,6 +52,22 @@ cursor_ = baseCore.cursor
taskType = '企业动态/天眼查/补采20W+'
def reqDetailmsg(url,headers):
# proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
for i in range(0,1):
try:
response=requests.get(url=url,headers=headers,timeout=8,verify=False)
response.encoding = response.apparent_encoding
htmltext=response.text
except Exception as e:
htmltext=''
log.info(f'{url}---详情请求失败--{e}')
if htmltext:
log.info(f'{url}---详情请求成功')
break
return htmltext
def beinWork(tyc_code, social_code,start_time):
time.sleep(3)
......@@ -171,13 +188,27 @@ def beinWork(tyc_code, social_code,start_time):
# 开始进行智能解析
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
#带标签正文
contentText = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3)
# req = requests.get(url=link,headers=headers,timeout=10)
# html = BeautifulSoup(req.content,'html.parser')
raw_html = reqDetailmsg(link,headers)
if raw_html:
# soup = BeautifulSoup(raw_html, 'html.parser')
try:
article = smart.extract_by_html(raw_html)
content = article.cleaned_text
contentText = article.text
except Exception as e:
log.info(f'抽取失败!!{e}')
# #带标签正文
# contentText = smart.extract_by_url(link).text
# #不带标签正文
# content = smart.extract_by_url(link).cleaned_text
# # time.sleep(3)
except Exception as e:
contentText = ''
if contentText == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
e = '获取正文失败'
......@@ -281,7 +312,7 @@ def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqybc_socialCode')
#social_code = '91440300665899831W'
# social_code = '913205007764477744'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......
......@@ -58,7 +58,7 @@ if __name__ == '__main__':
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
}
query = "select * from clb_sys_attachment where id= 383007"
query = "SELECT * FROM clb_sys_attachment WHERE type_id=1 AND source='证监会'"
cursor_.execute(query)
results = cursor_.fetchall()
for result in results:
......@@ -74,9 +74,10 @@ if __name__ == '__main__':
pass
else:
com_name = selects[1]
full_path = 'http://114.115.215.96/' + result[6]
full_path = 'http://zzsn.luyuen.com/' + result[19]
year = result[9]
create_time = result[13]
publish = str(result[21])
content = ''
for i in range(0, 3):
try:
......@@ -102,9 +103,9 @@ if __name__ == '__main__':
'id': '',
'keyWords': '',
'lang': detect_language,
'origin': com_name + '企业官网',
'origin': '证监会',
# 'origin': '雪球网',
'publishDate': str(year) + '-12-31',
'publishDate': publish,
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
......
import json
import json
import json
from datetime import datetime
from kafka import KafkaProducer
from fdfs_client.client import get_tracker_conf, Fdfs_client
from bs4 import BeautifulSoup
import requests, re, time, pymysql, fitz
import urllib3
......@@ -13,12 +12,10 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会'
pathType = 'QYYearReport/'
......@@ -113,6 +110,11 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
else:
continue
# print(name)
# 将时间年月日字符串转换为datetime对象
date_object = datetime.strptime(pub_time, "%Y-%m-%d")
# 将datetime对象转换为年月日时分秒字符串
datetime_string = date_object.strftime("%Y-%m-%d %H:%M:%S")
report_type = td_list[4].text.strip()
# print(report_type)
if report_type == '年报':
......@@ -123,8 +125,8 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '')
except Exception as e:
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
year = int(pub_time) - 1
year = str(year)
year = int(pub_time[:4]) - 1
# year = str(year)
# page_size = 0
......@@ -143,13 +145,16 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
return False
#插入数据库获取att_id
num = num + 1
att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num,pub_time)
origin = '证监会'
att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num,pub_time,origin)
if att_id:
pass
else:
return False
content = retData['content']
lang = baseCore.detect_language(content)
if lang == 'cn':
lang = 'zh'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
......@@ -160,9 +165,9 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '证监会',
'publishDate': pub_time,
'lang': lang,
'origin': origin,
'publishDate': datetime_string,
'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接
'summary': '',
......@@ -174,7 +179,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
......@@ -316,8 +321,8 @@ if __name__ == '__main__':
while True:
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:gnqy_socialCode')
social_code = '91100000100003962T'
social_code = baseCore.redicPullData('AnnualEnterprise:gnqy_socialCode')
# social_code = '91210800765420138L'
if not social_code:
time.sleep(20)
continue
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -213,7 +213,7 @@ def spider_annual_report(dict_info,num):
'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接
'summary': '',
'title': name_pdf.replace(',pdf', ''),
'title': name_pdf.replace('.pdf', ''),
'type': 1,
'socialCreditCode': social_code,
'year': year
......@@ -260,7 +260,7 @@ if __name__ == '__main__':
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '913412007050444417'
social_code = '91330000734507783B'
if not social_code:
time.sleep(20)
continue
......
......@@ -666,7 +666,7 @@ class BaseCore:
self.cnx_.commit()
# 插入到att表 返回附件id
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time):
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time,origin):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
......@@ -688,12 +688,12 @@ class BaseCore:
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn',pub_time)
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn',pub_time,origin)
self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交
......
......@@ -29,7 +29,7 @@ type_id = 1
create_by = 'XueLingKun'
taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网1_福布斯2000_PDF_60_付
file_path = 'D:\\年报\\欧盟记分牌2500_年报补充_87_20231020'
file_path = 'D:\\年报\\福布斯2000强_年报补充_20231018'
log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news):
......@@ -146,9 +146,9 @@ if __name__=='__main__':
social_code = data[1]
ename = data[2]
cname = data[3]
file_name = ename + ':' + file_year + '年年度报告' + '.pdf'
file_name = cname + ':' + file_year + '年年度报告' + '.pdf'
content = ''
origin = ename + '官网'
origin = cname + '官网'
#解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file:
......@@ -178,8 +178,9 @@ if __name__=='__main__':
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path)
if retData_f['state']:
#retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,file_year,file_name, num,file_year+'-12-31')
att_id= baseCore.tableUpdate(retData_f, cname,file_year,file_name, num,file_year+'-12-31',origin)
if att_id:
detect_language = baseCore.detect_language(content)
dic_news = {
'attachmentIds': att_id,
'author': '',
......@@ -189,7 +190,7 @@ if __name__=='__main__':
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'lang': detect_language,
'origin': origin,
'publishDate': file_year + '-12-31',
'sid': '1684032033495392257',
......
......@@ -397,7 +397,7 @@ def get_content2():
if is_href:
num+=1
log.info('已采集----------跳过')
time.sleep(0.5)
time.sleep(1)
continue
try:
resp = requests.get(url=href, headers=headers, verify=False)
......@@ -663,7 +663,8 @@ def bei_jing():
# bro = webdriver.Chrome(chrome_options=chrome_options, executable_path=r'D:/chrome/103/chromedriver.exe')
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chromedriver = r'D:\cmd100\chromedriver.exe'
bro = webdriver.Chrome(chrome_options=chrome_options, executable_path=chromedriver)
#bro = webdriver.Chrome(chrome_options=chrome_options, executable_path=chromedriver)
bro = webdriver.Chrome(options=chrome_options, executable_path=chromedriver)
with open('../../base/stealth.min.js') as f:
js = f.read()
......@@ -1830,7 +1831,10 @@ def hai_nan():
href = 'http://gzw.hainan.gov.cn/zwgk_23509/' + href.replace('../../', '')
elif './' in href:
href = href.replace('./', 'http://gzw.hainan.gov.cn/zwgk_23509/zfwj/bmwj/')
is_href = db_storage.find_one({'网址': href})
try:
is_href = db_storage.find_one({'网址': href.split('?')[0]})
except:
is_href = db_storage.find_one({'网址': href})
if is_href:
num+=1
continue
......@@ -1906,7 +1910,7 @@ def hai_nan():
pub_time = tbody_text.split('发文日期:')[1].split('名  称:')[0].strip().lstrip().replace('年',
'-').replace(
'月', '-').replace('日', '')
writtenDate = ''
writtenDate = None
topicClassification = tbody_text.split('分  类:')[1].split('发文机关:')[0].strip().lstrip()
contentWithTag = source.find('div', attrs={'class': 'zx-xxxqy-nr'})
content = contentWithTag.text
......@@ -1963,7 +1967,7 @@ def hai_nan():
0].strip().lstrip()
pub_source = ''
pub_hao = ''
writtenDate = ''
writtenDate = None
topicClassification = ''
contentWithTag = source.find('div', attrs={'class': 'TRS_UEDITOR'})
content = contentWithTag.text
......@@ -2018,7 +2022,10 @@ def hai_nan():
title = str(doc_item).split('target="_blank">')[1].split('</a>')[0]
href = 'https://www.hainan.gov.cn' + str(doc_item).split('href="')[1].split('" target')[0]
# print(title,href)
is_href = db_storage.find_one({'网址': href})
try:
is_href = db_storage.find_one({'网址': href.split('?')[0]})
except:
is_href = db_storage.find_one({'网址': href})
if is_href:
num+=1
continue
......
......@@ -58,8 +58,8 @@ class Tycdt(object):
def doJob(self):
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
# social_code = self.baseCore.redicPullData('NewsEnterprise:gnqybc_socialCode')
social_code = '913205002517479347'
social_code = self.baseCore.redicPullData('NewsEnterprise:gnqybc_socialCode')
# social_code = '913205002517479347'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......
......@@ -50,7 +50,7 @@ if __name__=="__main__":
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
opt.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chromedriver = r'D:\cmd100\chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/"
......
import datetime
import json
import time
import redis
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from kafka import KafkaProducer
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=0)
def sendKafka(dic_news):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("crawlerInfo",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.info(dic_result)
return False
def getRequest(url,headers):
req = requests.get(url=url, headers=headers, timeout=30)
if req.status_code == 200:
pass
soup = BeautifulSoup(req.content, 'html.parser')
return soup
def deletep(soup,attribute_to_delete,value_to_delete):
# 查找带有指定属性的P标签并删除
p_tags = soup.find_all('p', {attribute_to_delete: value_to_delete})
for p_tag in p_tags:
p_tag.decompose()
def deletek(soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外)
for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video", "br"] and tag.name != "br" or tag.get_text()==' '):
for j in i.descendants:
if j.name in ["img", "video", "br"]:
break
else:
i.decompose()
# 将html中的相对地址转换成绝对地址
def paserUrl(html, listurl):
# 获取所有的<a>标签和<img>标签
if isinstance(html, str):
html = BeautifulSoup(html, 'html.parser')
links = html.find_all(['a', 'img'])
# 遍历标签,将相对地址转换为绝对地址
for link in links:
if 'href' in link.attrs:
link['href'] = urljoin(listurl, link['href'])
elif 'src' in link.attrs:
link['src'] = urljoin(listurl, link['src'])
return html
if __name__=='__main__':
headers = {
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9',
'Cache-Control':'max-age=0',
'Cookie':'UM_distinctid=18b5f64f72a580-0d0997e58eee04-26031e51-e1000-18b5f64f72bab5; wdcid=23a1d057521777ff; wdses=22f0d407e263a31e; CNZZDATA30019853=cnzz_eid%3D744929620-1698112534-%26ntime%3D1698112562; wdlast=1698112562',
'Host':'www.qstheory.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36'
}
url = 'http://www.qstheory.cn/qs/mulu.htm'
soup_report = getRequest(url,headers)
report_list = soup_report.find_all('div', class_='col-sm-3')
for book in report_list:
href = book.find('div', class_='booktitle').find('a')['href']
year = book.find('div', class_='booktitle').find('a').text
soup_href = getRequest(href,headers)
period = soup_href.find('div', class_='highlight')
deletep(period,'align','center')
deletek(period)
period_list = period.find_all('p')
for p in period_list:
period_href = p.find('a')['href']
period_title = p.find('a').text
soup_news = getRequest(period_href,headers)
deletep(soup_news, 'align', 'center')
deletek(soup_news)
title_list = soup_news.select('div[class="highlight"]>p')[1:]
for new in title_list:
try:
deletek(new)
try:
author = new.find('font', face='楷体').text.replace('/', '').replace('\u3000', ' ').replace('\xa0', '')
except:
continue
if len(author)>4:
continue
# if '(' in author or '本刊' in author or '国家' in author\
# or '中共' in author or '记者' in author or '新闻社' in author\
# or '党委' in author or '调研组' in author or '研究中心' in author\
# or '委员会' in author or '博物' in author or '大学' in author or '联合会' in author :
if '(' in author or '本刊' in author or '国家' in author \
or '中共' in author or '记者' in author or '新闻社' in author \
or '党委' in author or '”' in author\
or '大学' in author or '洛桑江村' in author:
continue
new_href = new.find('a')['href']
is_member = r.sismember('qiushileaderspeech::' + period_title, new_href)
if is_member:
continue
new_title = new.find('a').text.replace('\u3000',' ').lstrip(' ').replace('——', '').replace('\xa0', '')
except:
continue
soup_new = getRequest(new_href,headers)
deletek(soup_new)
deletep(soup_new, 'style', 'TEXT-ALIGN: center')
result = soup_new.find('div', class_='inner')
if result:
pass
else:
continue
span_list = result.find_all('span')
source = span_list[0].text.replace('来源:', '').strip('\r\n')
pub_time = span_list[2].text.strip('\r\n')
content = soup_new.find('div', class_='highlight').text
paserUrl(soup_new, new_href)
contentWithTag = soup_new.find('div', class_='highlight')
nowDate = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
dic_news = {
'sid': '1716996740019585025',
'title': new_title,
'source': "16",
'origin': source,
'author': author,
'publishDate': pub_time,
'content': content,
'contentWithTag': str(contentWithTag),
'sourceAddress': new_href,
"createDate": nowDate
}
# log.info(dic_news)
if sendKafka(dic_news):
r.sadd('qiushileaderspeech::' + period_title, new_href)
log.info(f'采集成功----{dic_news["sourceAddress"]}')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论