提交 7ef6f432 作者: 薛凌堃

11.30

上级 06d190ff
import re import re
import re import re
import fitz
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import pandas as pd import pandas as pd
...@@ -38,7 +39,8 @@ def getSoup(url): ...@@ -38,7 +39,8 @@ def getSoup(url):
def getPageSize(): def getPageSize():
url = 'https://www.cushmanwakefield.com.cn/research-report/p1.html?expert=0' # url = 'https://www.cushmanwakefield.com.cn/research-report/p1.html?expert=0'
url = 'https://www.cushmanwakefield.com.cn/research-report/p1.html?expert=1'
soup = getSoup(url) soup = getSoup(url)
total = int(re.findall('\d+', soup.find('dl', class_='sousuo_result').text.lstrip().strip())[0]) total = int(re.findall('\d+', soup.find('dl', class_='sousuo_result').text.lstrip().strip())[0])
if total % 4 == 0: if total % 4 == 0:
...@@ -49,20 +51,27 @@ def getPageSize(): ...@@ -49,20 +51,27 @@ def getPageSize():
def getContent(url): def getContent(url):
content = ''
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
req = requests.get(url, headers=headers, proxies=ip) req = requests.get(url, headers=headers, proxies=ip)
req.encoding = req.apparent_encoding # req.encoding = req.apparent_encoding
return req.content with fitz.open(stream=req.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
content += page.get_text()
return content
def doJob(): def doJob():
if not os.path.exists('./研究咨询/戴德梁兴/行业视角-研究报告'): # if not os.path.exists('./研究咨询/戴德梁兴/行业视角-研究报告'):
os.makedirs('./研究咨询/戴德梁兴/行业视角-研究报告') # os.makedirs('./研究咨询/戴德梁兴/行业视角-研究报告')
num = 1 num = 1
data_list = [] data_list = []
pageSize = getPageSize() pageSize = getPageSize()
for page in range(1, pageSize + 1): for page in range(1, pageSize + 1):
url = f'https://www.cushmanwakefield.com.cn/research-report/p{page}.html?expert=0' # url = f'https://www.cushmanwakefield.com.cn/research-report/p{page}.html?expert=0'
url = f'https://www.cushmanwakefield.com.cn/research-report/p{page}.html?expert=1'
soup = getSoup(url) soup = getSoup(url)
div_list = soup.find('div', class_='guwen_list_box').find_all('div', class_='zhuangyuan_guwen_box') div_list = soup.find('div', class_='guwen_list_box').find_all('div', class_='zhuangyuan_guwen_box')
for div in div_list: for div in div_list:
...@@ -74,7 +83,8 @@ def doJob(): ...@@ -74,7 +83,8 @@ def doJob():
origin = '戴德梁兴' origin = '戴德梁兴'
try: try:
content = getContent(href) content = getContent(href)
except: # print(content)
except Exception as e:
log.error(f'第{page}页==={name}===连接失败') log.error(f'第{page}页==={name}===连接失败')
continue continue
title = name.replace('/',' ').replace('|',' ').replace('?',' ').replace('"','”') title = name.replace('/',' ').replace('|',' ').replace('?',' ').replace('"','”')
......
# coding:utf-8
import time
import urllib3
import urllib3
import BaseCore import BaseCore
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
import threading
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
...@@ -20,101 +16,15 @@ cursor = baseCore.cursor ...@@ -20,101 +16,15 @@ cursor = baseCore.cursor
cnx_11 = baseCore.cnx_ cnx_11 = baseCore.cnx_
cursor_11 = baseCore.cursor_ cursor_11 = baseCore.cursor_
def getList(): def getList():
sql = 'Select id,item_id,year from clb_sys_attachment where type_id = 1' sql = 'Select id,item_id,year from clb_sys_attachment where type_id = 1'
cursor_11.execute(sql) cursor_11.execute(sql)
datas = cursor_11.fetchall() datas = cursor_11.fetchall()
total = len(datas)
page = total // 5
datas_lists = [list(datas[i:i + page]) for i in range(0, len(datas), page)]
return list(datas_lists)
def process_item(datas_list):
for datas in datas_list:
sid = datas[0]
xydm = datas[1]
year = datas[2]
if not xydm or xydm == 'None':
log.error(f'{sid}===没有信用代码')
return
if not year or year == 'None':
log.error(f'{sid}===没有年份')
return
body = {
"query": {
"bool": {
"must": [
{
"nested": {
"path": "labels",
"query": {
"match": {
"labels.relationId": f"{xydm}"
}
}
}
},
{
"term": {
"type.keyword": {
"value": "1"
}
}
},
{
"term": {
"year.keyword": {
"value": f"{year}"
}
}
}
]
}
},
"sort": [
{
"publishDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 10
}
res = es.search(index='researchreportdata', body=body)
if len(res['hits']['hits']) == 0:
log.error(f'{xydm}==={year}===未查询到')
for hit in res['hits']['hits']:
try:
sid_ = hit['_source']['attachmentIds'][0]
except:
log.error(f'{xydm}==={year}===es中未查询到附件id')
return
if str(sid) in str(sid_):
log.info(f'{xydm}==={year}===查询到')
else:
log.error(f'{xydm}==={year}===未查询到,查询到其它sid为{sid_}')
time.sleep(2)
threads = []
datas_lists = getList()
for datas_list in datas_lists:
t = threading.Thread(target=process_item, args=(datas_list,))
threads.append(t)
t.start()
# 等待所有线程执行完毕
for t in threads:
t.join()
es.transport.close()
cursor_11.close()
cnx_11.close()
baseCore.close()
gg_social_list = [str(item[1]) + "|" + str(item[0]) + "|" + str(item[2]) for item in datas]
print('=======')
for item in gg_social_list:
r.rpush("NianBao:info", item)
if __name__ == "__main__":
getList()
import json
import threading
import time
import uuid
import redis
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
# 使用连接池
# cnx_ = baseCore.pool_11.connection()
# cursor_ = cnx_.cursor()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType = 'QYNotice/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self, index_name, xydm, year, att_id):
body = {
"query": {
"bool": {
"must": [
{
"nested": {
"path": "labels",
"query": {
"match": {
"labels.relationId": f"{xydm}"
}
}
}
},
{
"term": {
"type.keyword": {
"value": "1"
}
}
},
{
"term": {
"attachmentIds.keyword": att_id
}
},
{
"term": {
"year.keyword": {
"value": f"{year}"
}
}
}
]
}
},
"sort": [
{
"publishDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 10
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.attachmentIds', # 字段1
'hits.hits._source.title',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
return result
def main(esMethod):
redis_conn = redis.Redis(connection_pool=pool)
info_ = redis_conn.lpop("NianBao:info")
if info_:
pass
else:
log.info("++++已没有数据++++")
return
info = info_.decode()
xydm = info.split('|')[0]
att_id = info.split('|')[1]
year = info.split('|')[2]
res = esMethod.queryatt(esMethod.index_name, xydm, year, att_id)
total = res['hits']['total']['value']
if total == 0:
log.error(f'{xydm}==={year}==={att_id}==未查询到')
redis_conn.lpush('NianBao:id', info)
return
def run_threads(num_threads,esMethod):
threads = []
for i in range(num_threads):
thread = threading.Thread(target=main, args=(esMethod,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
while True:
esMethod = EsMethod()
start = time.time()
num_threads = 5
run_threads(num_threads, esMethod)
log.info(f'5线程 总耗时{time.time() - start}秒')
"""
年报上传到es
content 需要重新解析
lang语言
origin 来源 从数据库中获取
title 从数据库中获取
dic_info = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': origin,
'publishDate': datetime_string,
'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接
'summary': '',
'title': name_pdf.replace('.pdf', ''),
'type': 1,
'socialCreditCode': social_code,
'year': year
}
"""
# 1.无年份和信用代码 另外存一个redis的key中
# 2.有信用代码 id 年份的 (1)es 中的id 需要更新为附件表中的id
# (2)通过信用代码 查出名称相同的个数 如果有两个的话,说明其中有一个没有在es库中 需要把es库中的id获取到,并删除没有在es库中的那个记录
#如果有一条记录,就需要把该记录上传
#todo:查出有一条记录的,先更新 其他的先保存到另一个redis中
import threading
import redis
import requests, re, time, pymysql, fitz
import urllib3
from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor()
lock = threading.Lock()
taskType = '企业年报'
pathType = 'QYYearReport/'
def secrchATT(type_id, xydm):
sel_sql = '''select * from clb_sys_attachment where item_id=%s '''
lock.acquire()
cursor_.execute(sel_sql, (type_id, xydm))
selects = cursor_.fetchall()
lock.release()
return selects
def main():
redis_conn = redis.Redis(connection_pool=pool)
info_ = redis_conn.lpop("NianBao:id")
if info_:
pass
else:
log.info("++++已没有数据++++")
return
info = info_.decode()
xydm = info.split('|')[0]
att_id = info.split('|')[1]
year = info.split('|')[2]
if not xydm or not year:
redis_conn.lpush('NianBao:info', info)
else:
selects = secrchATT(1, xydm)
if len(selects) > 1:
redis_conn.lpush('NianBao:info', info)
elif len(selects) == 1:
file_name = selects[1]
origin = selects[18]
create_time = selects[13]
publishDate = selects[21]
file_href = 'http://zzsn.luyuen.com' + str(selects[5])
def run_threads(num_threads):
threads = []
for i in range(num_threads):
thread = threading.Thread(target=main, args=())
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
start = time.time()
num_threads = 5
run_threads(num_threads)
log.info(f'5线程 总耗时{time.time() - start}秒')
\ No newline at end of file
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# @Author: MENG
# @Time : 2022-3-18
import requests
from pyquery import PyQuery as pq
import time
import json
import pymongo
from requests.packages import urllib3
urllib3.disable_warnings()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='zzsn@9988').caiji['人民网-习讲话数据库_copy']
# 习讲话数据库 新增数据
def get_content():
art_content_dict = {}
article_id_list = []
headers = {
'Proxy-Connection': 'keep-alive',
'Accept': '*/*',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': 'sfr=1; sso_c=0; __jsluid_h=5b9f09f6fdae46fadb89e1e02dca3238; wdcid=04fccdf5121158c0; wdses=72d07de4316a36a5; ci_session=4irerhndk5mc48dq7ldebjn5m47fqptg; wdlast=1646734820; ci_session=4irerhndk5mc48dq7ldebjn5m47fqptg'
}
for page in range(3, 0, -1):
url = f"http://jhsjk.people.cn/testnew/result?keywords=&isFuzzy=0&searchArea=0&year=0&form=0&type=0&page={page}&origin=%E5%85%A8%E9%83%A8&source=2"
try:
resp_json = requests.request("GET", url, headers=headers, verify=False).json()
time.sleep(0.2)
data_list = resp_json['list']
except:
print('请求错误1')
continue
for data_dict in data_list[::-1]:
article_id = data_dict['article_id']
is_article_id = db_storage.find_one({'id': article_id})
if is_article_id:
continue
title = data_dict['title']
pub_time = data_dict['input_date']
title_dict_list = db_storage.find({'title': title, 'is_repeat': ''})
is_repeat = ''
for title_dict in title_dict_list:
pub_time1 = title_dict['publishDate']
time_ = int(int(time.mktime(time.strptime(pub_time, "%Y-%m-%d %H:%M:%S"))) - int(time.mktime(time.strptime(pub_time1, "%Y-%m-%d %H:%M:%S"))))
if -259200 < time_ < 259200:
is_repeat = '1'
break
article_id_list.append(article_id)
href = 'http://jhsjk.people.cn/article/' + article_id
try:
href_text = requests.request("GET", href, headers=headers, verify=False).content
time.sleep(0.2)
doc_href = pq(href_text)
content_html1 = str(doc_href('.d2txt_con.clearfix'))
content_html2 = str(doc_href('.editor.clearfix'))
except:
print('请求错误2')
continue
content_html = content_html1 + '\n' + content_html2
content = pq(content_html).text()
if content.strip() == '':
print(href, '内容为空')
continue
origin = data_dict['origin_name']
a_dict = {
'id': article_id,
'title': title,
'author': '',
'origin': origin,
'content': content_html,
'publishDate': pub_time,
'sourceAddress': href,
'tags': [],
'is_repeat': is_repeat
}
art_content_dict[article_id] = a_dict
db_a_dict = a_dict.copy()
db_storage.insert_one(db_a_dict)
if is_repeat == '':
print(href)
if article_id_list is []:
return
art_type_dict = {}
result_lists = [
['类型', '讲话', '706', '69'], ['类型', '会议', '701', '178'], ['类型', '活动', '702', '63'], ['类型', '考察', '703', '72'],
['类型', '会见', '704', '174'], ['类型', '出访', '705', '188'], ['类型', '函电', '707', '194'], ['类型', '其他', '708', '203'],
['领域', '经济', '101', '18'], ['领域', '政治', '102', '21'], ['领域', '文化', '103', '14'], ['领域', '社会', '104', '15'],
['领域', '生态', '105', '7'], ['领域', '党建', '106', '9'], ['领域', '国防', '107', '6'], ['领域', '外交', '108', '50'],
]
for result_list in result_lists:
sort = result_list[0]
sort_text = result_list[1]
print(sort_text)
if sort == '类型':
form = result_list[2]
type_ = '0'
else:
form = '0'
type_ = result_list[2]
# total_page = result_list[3]
total_page = 2
for page in range(1, int(total_page)):
url = f"http://jhsjk.people.cn/testnew/result?keywords=&isFuzzy=0&searchArea=0&year=0&form={form}&type={type_}&page={page}&origin=%E5%85%A8%E9%83%A8&source=2"
payload = {}
try:
resp_json = requests.request("GET", url, headers=headers, verify=False, data=payload).json()
data_list = resp_json['list']
except:
print('请求错误3')
continue
for data_dict in data_list:
article_id = data_dict['article_id']
if article_id in article_id_list:
type_dict = {
"type": sort,
"name": sort_text,
}
type_list = art_type_dict.get(article_id)
if type_list is None:
type_lists = [type_dict]
else:
type_lists = type_list + [type_dict]
art_type_dict[article_id] = type_lists
for key, value in art_content_dict.items():
tags = art_type_dict.get(key)
if tags is None:
tags = []
value['tags'] = tags
post_dict = value
db_storage.update_one({'id': post_dict['id']}, {'$set': {'tags': tags}})
if post_dict['is_repeat'] == '1':
continue
try:
del post_dict['is_repeat']
# labels = []
# for tags_dict in post_dict['tags']:
# labels_dict = {
# 'abelRemarks': tags_dict.get('type'),
# 'relationName': tags_dict.get('name'),
# }
# labels.append(labels_dict)
# aaa_dict = {
# 'sid': '1533647545473859586',
# 'title': post_dict['title'],
# 'content': '',
# 'contentWithTag': post_dict['content'],
# 'summary': '',
# 'author': '',
# 'origin': post_dict['origin'],
# 'publishDate': post_dict['publishDate'],
# 'sourceAddress': post_dict['sourceAddress'],
# 'labels': labels
# }
post_url = 'http://114.116.19.92:8088/api/reptile/autoSaveXJPSpeak'
headers = {'Content-Type': 'application/json'}
resp_json = requests.post(url=post_url, headers=headers, verify=False, data=json.dumps(post_dict)).json()
print('推送:', resp_json['msg'])
except:
print('数据传接口失败,正在重试!')
time.sleep(5)
db_storage.delete_one({'id': post_dict['id']})
continue
if __name__ == '__main__':
try:
get_content()
except Exception as e:
pass
"""
#本地访问地址 http://192.168.1.190:8008/api/reptile/autoSaveXJPSpeak
#服务器地址 http://114.116.19.92:8088/api/reptile/autoSaveXJPSpeak
#请求方式: post
#请求体数据,JSON格式
{
"id": "32396175",
"title": "习近平:只有攥紧中国种子,才能端稳中国饭碗",
"author": "",
"origin": "人民日报客户端",
"content": "带标签",
"publishDate": "习近平:只有攥紧中国种子,才能端稳中国饭碗",
"sourceAddress": "http://jhsjk.people.cn/article/32396175",
"tags": [{
"type": "类型",
"name": "考察"
}, {
"type": "领域",
"name": "文化"
}]
}}
"""
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论