提交 6e37a78a 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	shenji/sclx.py
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import pandas as pd
import redis
def putCom():
com_list = ['91210000558190456G', '914200001000115161', '911100007109310534', '9111000071093123XX',
'91110000100017643K', '91110000100018267J', '91110000MA01P657XY', '91230100127057741M',
'91440300190346175T', 'ZZSN22083000000003', '91110000400000720M', '911100001055722912',
'91110000100005220B', '911100001000094165', '91310000132200821H', '911100001000128855',
'91110000710924910P', '91110000710924929L', '911100007109225442', '9111000071092649XU',
'91310000MA1FL70B67', '911100007109311097', '912201011239989159', '911100007178306183',
'91310000MA7ALG04XG', '91110000100017707H', '91110000710929498G', '91110000100010249W',
'9151000062160427XG', '91310000MA1FL4B24G', '91110000400001889L', '9144030010001694XX',
'91110000100000825Q', '91110000100006194G', '91110000717828315T', '91110000100001043E',
'91110000MA005UCQ5P', '91110000710935732K', '91110000710930392Y', '91110000710930296M',
'911100007109303176', '91110000710925243K', '91110000100014071Q', '91110000100009563N',
'9111000071093107XN', '9111000010001002XD', '91110000100001852R', '91110000100001625L',
'911100001000080343', '91110000400008060U', '91110000101699383Q', '91110000100000489L',
'9111000071092868XL', '91110000100001035K', '911100004000011410', '91110000710933809D',
'91110000100010310K', '91133100MABRLCFR5Q', '91110000MA001HYK9X', '911100001000016682',
'911100007109279199', '12100000400010275N', '91110000710935636A', '91110000100024800K',
'9144000076384341X8', '91440000100005896P', '91110000MA01W8B394', '91110000717830650E',
'91110000100003057A', 'ZZSN22061600000001', '91310000MA1FL0LX06', '9111000010169286X1',
'91110000100010433L', '91110000100010660R', '91110000102016548J', '91110000100001676W',
'9111000071092200XY', '91133100MA0G9YKT8B', '9111000010000093XR', '91110000100006485K',
'91360702MA7FK4MR44', '91420100MA4L0GG411', '91110000101625149Q', '12100000400006022G',
'912302001285125661', '91110000100005888C', '911100007109250324', '91110000100024915R',
'9111000040000094XW', '91310000MA1FL1MMXL', '91110000100015058K', '91110000710929930X',
'91133100MA0GBL5F38', '9111000010000085X6', '91110000101100414N']
df = pd.read_excel('D:\\企业数据\\数据组提供\\国内企业.xlsx')
# 连接到Redis数据库
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
for i in range(len(df)):
social_code = df['social_code'][i]
com_name = df['name'][i]
# print(social_code)
if social_code in com_list:
pass
else:
if 'ZZSN' in social_code or 'ZD' in social_code:
continue
else:
item = social_code + '|' + com_name
r.rpush('UpdateBasdeInfo:SocialCode_CompanyName', item)
def putCom_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个月执行一次
scheduler.add_job(putCom, 'cron', day=1, hour=0, minute=0)
try:
# redisPushData # 定时开始前执行一次
# putCom()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
if __name__ == '__main__':
putCom_task()
\ No newline at end of file
import pandas as pd
# from pandas import DataFrame as df
import pymysql
import redis
# 连接到Redis
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
with cnx.cursor() as cursor:
select = """select relationName, relationId from klb_company"""
cursor.execute(select)
results = cursor.fetchall()
for result in results:
name = result[0]
xydm = result[1]
item = f'{name}|{xydm}'
r.rpush('SousuoBaidu:companyname', cell_value)
# 列表名称
list_name = 'BaseInfoEnterpriseMz:gnqy_socialCode'
# 获取列表中的所有元素
elements = r.lrange(list_name, 0, -1)
# 遍历列表中的元素
for element in elements:
# 获取元素在列表中的数量
count = r.lrem(list_name, 0, element)
# 如果数量大于1,说明有重复值,删除多余的重复值
if count > 1:
r.lrem(list_name, count - 1, element)
# 打印处理后的列表
print(r.lrange(list_name, 0, -1))
......@@ -45,15 +45,18 @@ def get_html(tycid, s, headers):
# div_part.find('div', class_='dimHeader_root__XTCLe')
except:
return -1
try:
tmp_field = div_part.find('div', class_='dim-tab-root').find('span').text
if '最新公示' in tmp_field:
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('最新公示')[1].replace(' ', '')
return int(total)
else:
return -1
except:
return 0
if div_part is None:
return -2
else:
try:
tmp_field = div_part.find('div', class_='dim-tab-root').find('span').text
if '最新公示' in tmp_field:
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('最新公示')[1].replace(' ', '')
return int(total)
else:
return -1
except:
return 0
@retry(tries=3, delay=1)
......@@ -64,7 +67,10 @@ def get_page(url, s, headers):
if res.status_code != 200:
raise
data_page = res.json()
total_page_ = data_page['data']['total']
try:
total_page_ = data_page['data']['total']
except:
raise
return total_page_
......@@ -77,11 +83,12 @@ def doJob():
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
# 'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'version': 'TYC-Web'
}
cookies_list, id_cookie = token.get_cookies()
cookies_list, id_cookie, user_name = token.get_cookies()
log.info(f'=====当前使用的是{user_name}的cookie======')
cookies = {}
for cookie in cookies_list:
cookies[cookie['name']] = cookie['value']
......@@ -90,7 +97,7 @@ def doJob():
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
# social_code = baseCore.redicPullData('CorPersonEnterprise:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
social_code = '911101067916069050'
social_code = '91110108780992804C'
if social_code == None:
time.sleep(20)
continue
......@@ -163,6 +170,11 @@ def doJob():
log.info(f"{id}---{xydm}----{tycid}----请求失败----重新放入redis")
time.sleep(2)
continue
elif charge == -2:
# 该企业没有人员信息
log.info(f"{id}---{xydm}----{tycid}----没有核心人员")
continue
elif charge == 0:
log.info(f"{id}---{xydm}----{tycid}----没有最新公示")
url1 = f'https://capi.tianyancha.com/cloud-company-background/company/dim/staff?_={t}&gid={tycid}&pageSize=20&pageNum=1'
......@@ -240,6 +252,8 @@ def doJob():
pass
else:
log.info(f'{id}---{xydm}----{tycid}----没有高管信息')
# todo: 关闭连接
res.close()
if flag == 1:
for one_info in list_all:
name = one_info['name']
......
......@@ -223,7 +223,9 @@ def spiderinfo(company_url, receptname, file_name):
else:
sourceUpdateTime = paserTime(sourceUpdateTime_).strftime("%Y-%m-%d %H:%M:%S")
except:
redaytowork(com_name, social_code, file_name)
log.info(f'天眼查无该企业{social_code}')
return
aa_dict = {
'name': receptname, # 企业名称
'shortName': None, # 企业简称
......@@ -326,7 +328,7 @@ if __name__ == '__main__':
driver.get('https://www.tianyancha.com/')
while True:
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/国内企业基本信息采集情况.xlsx'
file_name = f'./data/国内企业基本信息更新.xlsx'
file.createFile(file_name)
# cookies_list, id_cookie = token.get_cookies()
# cookies = {}
......@@ -336,8 +338,8 @@ if __name__ == '__main__':
# s.cookies.update(cookies)
start_time = time.time()
# 获取企业信息
# company_field = baseCore.redicPullData('BaseInfoEnterpriseUptime:gnqy_socialCode')
company_field = '913100006073602992|光明乳业股份有限公司'
company_field = baseCore.redicPullData('BaseInfoEnterpriseUptime:gnqy_socialCode')
# company_field = '913100006073602992|光明乳业股份有限公司'
if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮
......@@ -398,7 +400,7 @@ if __name__ == '__main__':
count = redaytowork(com_name, social_code, file_name)
else:
company_url = 'https://www.tianyancha.com/company/' + tycid
spiderinfo(company_url, social_code, file_name)
spiderinfo(company_url, com_name, file_name)
time.sleep(10)
# break
baseCore.close()
\ No newline at end of file
......@@ -59,7 +59,8 @@ class Token():
result = db_storage.find_one(query, sort=[('updateTime', 1)])
cookies = result['cookies']
id_token = result['_id']
return cookies, id_token
user_name = result['name']
return cookies, id_token, user_name
# 删除失效的token
def delete_token(self, cookie_):
......
"""
从es中拿到所有的标题
"""
import redis
from elasticsearch import Elasticsearch
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name):
body = {
"query": {
"bool": {
"must": [
{
"nested": {
"path": "labels",
"query": {
"match": {
"labels.relationId": "91330000747735638J"
}
}
}
},
{
"range": {
"createDate": {
"gte": "2024-02-26T13:00:00",
"lte": "2024-02-27T00:00:00"
}
}
},
{
"term": {
"type.keyword": {
"value": "3"
}
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 100
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.title',
'hits.hits._source.origin',
'hits.hits._source.publishDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
if __name__ == '__main__':
es_method = EsMethod()
# 连接Redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
result = es_method.queryatt('researchreportdata')
total = result['hits']['total']['value']
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
log.info(f'---第1页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
origin = mms['_source']['origin']
pub_time = mms['_source']['publishDate']
try:
log.info(f'{id}--{title}--{origin}--')
item = id + "|" + title
# r.lrem(f'XJPdatabase:id_2', 0, item)
r.lpush(f'91330000747735638J:id', item)
except:
continue
......@@ -31,7 +31,7 @@ class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'),timeout=300 )
self.index_name='researchreportdata'
self.index_name='researchreportdata_2024'
'''
删除
......@@ -47,13 +47,13 @@ if __name__ == "__main__":
redis_conn = redis.Redis(connection_pool=pool)
while True:
# 从redis中读取数据,去附件表中根据title查询,更新查到的附件id
item = redis_conn.lpop('YanBao:id')
item = redis_conn.lpop('91330000747735638J:id')
if item:
log.info(item)
id = item.decode()
id = int(item.decode().split('|')[0])
try:
esMethod.delete(esMethod.index_name,id)
except:
except Exception as e:
continue
else:
log.info('已删除完毕')
......
......@@ -51,7 +51,7 @@ def parse_excel():
def get_content1():
print_result_list = []
result_dict_list = []
# query = {"专家库主键id":"1204"}
# query = {"专家库主键id":"141"}
# for db_dict in db_storage.find(query):
for db_dict in db_storage.find():
del db_dict['_id']
......
# 中央全面深化改革委员会会议
import json
import sys
import time
import redis
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from kafka import KafkaProducer
headers = {
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
header = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
......@@ -26,22 +32,50 @@ headers = {
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Connection': 'keep-alive',
'Cookie': 'cna=HcAKHtgXUG4CAQHBO1G6ZJYK',
'Host': 'news.12371.cn',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0',
'sec-ch-ua': '"Not A(Brand";v="99", "Microsoft Edge";v="121", "Chromium";v="121"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
if __name__ == "__main__":
# 中央全面深化改革委员会会议
r = redis.Redis(host='114.115.236.206', port=6379, password='clbzzsn', db=5)
# 中央全面深化改革领导小组会议
# url_list = ['https://www.12371.cn/special/zyqmshggldxzhy19/', 'https://www.12371.cn/special/zyqmshggldxzhy19/']
url_list = ['https://www.12371.cn/special/zyqmshggldxzhy19/']
for url in url_list:
request = requests.get(url=url, headers=headers)
soup = BeautifulSoup(request.content, 'html.parser')
request.encoding = request.apparent_encoding
# print(soup)
info_html = soup.find('div', id='SUBD1663831285709121').find('ul', class_='ul_list')
ul_list = info_html.find_all('li')
for ul in ul_list:
url = 'https://www.12371.cn/special/zyqmshggldxzhy19/'
request = requests.get(url=url, headers=header)
soup = BeautifulSoup(request.content, 'html.parser')
# print(soup)
request.encoding = request.apparent_encoding
# print(soup)
# info_html = soup.find('div', id='SUBD1663831285709121').find('ul', class_='ul_list')
info_html_list = soup.find_all('div', class_='dyw1023_right_list01 hyty')
flag = 1
for info_html in info_html_list:
if flag == 1:
info_code = 'IN-20230816-0004'
sid = '1691633319715676162'
else:
sid = '1691633869186277378'
info_code = 'IN-20230816-0005'
ul_list = info_html.find('ul', class_='ul_list').find_all('li')
for ul in ul_list[::-1]:
publishDate_ = str(ul.find('span').text)
date_obj= datetime.strptime(publishDate_, "%Y年%m月%d日")
publishDate = date_obj.strftime('%Y-%m-%d')
......@@ -51,18 +85,27 @@ if __name__ == "__main__":
newsUrl = ul.find('a')['href']
summary = ul.find('a').text
# todo: 链接判重
news_request = requests.get(url=newsUrl, headers=headers)
try:
flag = r.sismember(info_code, newsUrl)
if flag:
log.info('信息已采集入库过')
continue
except Exception as e:
continue
news_request = requests.get(url=newsUrl, headers=headers, allow_redirects=False)
news_soup = BeautifulSoup(news_request.content, 'html.parser')
print(news_soup)
title = news_soup.find('h1', class_='big_title').text
source = news_soup.find('div', class_='title_bottom').find('i').text
contentwithTag = news_soup.find('div', class_='word')
content = contentwithTag.text
if url == 'https://www.12371.cn/special/zyqmshggldxzhy19/':
sid = '1691633319715676162'
else:
sid = '1691633869186277378'
# print(news_soup)
try:
title = news_soup.find('h1', class_='big_title').text
source = news_soup.find('div', class_='title_bottom').find('i').text
contentwithTag = news_soup.find('div', class_='word')
content = contentwithTag.text
except Exception as e:
log.error(f'解析网页出错{newsUrl}')
continue
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info ={
'id': '1681549361661489154' + str(int(time.time()*1000)),
'title': title,
......@@ -79,6 +122,7 @@ if __name__ == "__main__":
'createDate': time_now,
}
r.sadd(info_code, newsUrl)
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try:
kafka_result = producer.send("research_center_fourth",
......@@ -89,4 +133,5 @@ if __name__ == "__main__":
print(e)
print('发送kafka异常!')
finally:
producer.close()
\ No newline at end of file
producer.close()
flag += 1
\ No newline at end of file
......@@ -174,6 +174,76 @@ def zyzsjg():
# sendKafka(data)
print(data)
def dfrwk():
datas_df = []
url_df = 'http://district.ce.cn/zt/rwk/'
req = requests.get(url=url_df, headers=headers)
soup = BeautifulSoup(req.content, 'html.parser')
df_list = soup.find('div', class_='left1').find_all('div')
for df in df_list:
df_place = df.text.replace('\n', '')
try:
df_href = df.find('a')['href']
except:
df_href = ''
if df_href:
datas_df.append([df_place,df_href])
print(datas_df)
peoples = []
for data in datas_df:
place = data[0]
href = data[1]
req_df = requests.get(url=href, headers=headers)
soup_df = BeautifulSoup(req_df.content, 'html.parser')
df_list_df = soup_df.find_all('div', class_='left2')
for df in df_list_df:
try:
rwpart = df.find('div', class_='ren2')
except:
log.error(f'{place}===={href}')
continue
if rwpart:
pass
else:
continue
tr_list = rwpart.find_all('tr')
for tr in tr_list:
td_list = tr.find_all('td')
if len(td_list) == 3:
leader = td_list[1].text
try:
leader_href = td_list[1].find('a')['href']
except:
leader_href = ''
# continue
position = td_list[2].text
print(place, leader, position)
if len(td_list) == 2:
leader = td_list[0].text
try:
leader_href = td_list[0].find('a')['href']
except:
leader_href = ''
# continue
position = td_list[1].text
print(place, leader, position)
people = {
'name': leader, # 姓名
'sex': '', # 性别
'work': position, # 职务
'birthplace': '', # 出生地
'birthday': '', # 出生日期
'company': '', # 曾任单位
'city': '', # 关联城市
'school': '', # 毕业院校
'province': '', # 省或直辖市
'type': 3, # 直属类别(1:部委人物库 2:中直任务库 3:地方人物库)
'department': '', # 部门
'headSculpture': '', # 照片链接
}
# print(name)
peoples.append(people)
def gwybw_task():
# 实例化一个调度器
......@@ -200,11 +270,12 @@ def zyzsjg_task():
if __name__ == "__main__":
try:
gwybw_task()
except:
log.error('部委人物采集出错')
try:
zyzsjg_task()
except:
log.error('中直人物采集出错')
# try:
# gwybw_task()
# except:
# log.error('部委人物采集出错')
# try:
# zyzsjg_task()
# except:
# log.error('中直人物采集出错')
dfrwk()
import os
import os
......@@ -46,22 +46,41 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
if category == '.pdf':
try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
#todo:判断内容是否成功
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
# todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
pass
except:
time.sleep(3)
continue
page_size = 1
log.error(f'文件损坏')
return retData
else:
for i in range(0, 3):
try:
page_size = 1
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
#todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
except:
time.sleep(3)
continue
name = str(getuuid()) + category
try:
result = getOBSres(pathType, name, response)
......@@ -85,7 +104,7 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
#baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
......
import os
import os
......@@ -48,6 +48,7 @@ def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {}
category = os.path.splitext(pdf_url)[1]
......@@ -56,16 +57,41 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
if category == '.pdf':
try:
ip = baseCore.get_proxy()
response = requests.get(pdf_url, headers=headers,verify=False,proxies=ip, timeout=20)
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
break
except Exception as e:
time.sleep(60)
continue
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
# todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
pass
except:
log.error(f'文件损坏')
return retData
else:
for i in range(0, 3):
try:
page_size = 1
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
#todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
except:
time.sleep(3)
continue
name = str(getuuid()) + category
try:
......@@ -73,12 +99,6 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except:
log.error(f'OBS发送失败')
return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
except:
log.error(f'文件损坏')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
......@@ -95,11 +115,12 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
#baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
......
......@@ -56,7 +56,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(20)
time.sleep(80)
s = requests.session()
#获取到token和cookies
......
联合国:https://www.un-ilibrary.org/content/papers/27082822
联合国:https://www.un-ilibrary.org/content/papers/27082822
世界经贸组织
https://docs.wto.org/dol2fe/Pages/FE_Search/FE_S_S006.aspx?Language=English&SourcePage=FE_B_009&Context=Script&DataSource=Cat&Query=(%40Symbol%3d%22WT%2fLET*%22+AND+(%40Title%3d(modifications+OR+rectifications)+AND+schedule))&languageUIChanged=true
经合组织
https://www.oecd-ilibrary.org/economics/oecd-policy-responses-on-the-impacts-of-the-war-in-ukraine_dc825602-en
国际化经营-欧盟
https://ec.europa.eu/eurostat/databrowser/explore/all/tb_eu?lang=en&display=list&sort=category
\ No newline at end of file
"""
国外智库-欧盟 经合组织
"""
import json
import time
import pymongo
from bs4 import BeautifulSoup
import requests
from datetime import datetime
from kafka import KafkaProducer
from retry import retry
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN[
'国外智库']
@retry(tries=2, delay=5)
def sendKafka(dic):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
kafka_result = producer.send("research_center_fourth",
json.dumps(dic, ensure_ascii=False).encode('utf8'))
log.info(f'{dic["sourceAddress"]}传输成功')
def secrchATT(item_id, retData, type_id, order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
baseCore.cursor_.execute(sel_sql, (item_id, retData['path'], type_id, order_by))
selects = baseCore.cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, file_name, num, publishDate,origin):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
object_key = full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1]
Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
file_name+'.pdf', type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, object_key, 'zzsn', publishDate,origin)
baseCore.cursor_.execute(Upsql, values) # 插入
baseCore.cnx_.commit() # 提交
baseCore.getLogger().info("更新完成:{}".format(Upsql))
selects = secrchATT(item_id, retData, type_id, order_by)
id = selects[0]
return id
def save_data(dic_news):
aaa_dic = {
'附件id': dic_news['attachmentIds'],
'网址': dic_news['sourceAddress'],
'tid': '',
'来源': f"经济合作与发展组织",
'创建时间': dic_news['createDate'],
'带标签内容': dic_news['contentWithTag'][:100],
'发布时间': dic_news['publishDate'],
'标题': dic_news['title']
}
db_storage.insert_one(aaa_dic)
@retry(tries=2, delay=5)
def translate(title, contentWithTag):
headers = {
'Content-Type': 'application/json',
}
dic_info = {
'title': title,
# 'summary': '<div>apple</div>',
'contentWithTag': contentWithTag
}
dic_info = json.dumps(dic_info)
req = requests.post('http://117.78.23.14:5001/translate', data=dic_info, headers=headers)
dataJson = req.json()
if dataJson['status'] == 'failed':
raise
titleRaw = dataJson['title']
contentWithTagRaw = dataJson['contentWithTag']
titleRaw = BeautifulSoup(titleRaw,'html.parser')
titleRaw = titleRaw.text
contentWithTagRaw = BeautifulSoup(contentWithTagRaw,'html.parser')
return titleRaw, contentWithTagRaw
def doJob():
num = 1
url = 'https://www.oecd-ilibrary.org/economics/oecd-policy-responses-on-the-impacts-of-the-war-in-ukraine_dc825602-en?page=1'
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Cookie': 'JSESSIONID=BHezogPwi8NJVECsKXCXqijdQ00-yMJHw_gR8wiC.ip-10-240-5-121; __cf_bm=c2byUypnSjXPS_UFDM7BMRGDxN6AQEkNVUjzw9HuSq8-1707054653-1-AbbI7JWWkfWKVGi8SKI06f0jGEjPdk5kvHAIRRpBHSSSnmxj1IcvGUT8+/O6R0U2RLZJECZdUzZIXAwFuEz5lPo=; _gcl_au=1.1.201344533.1707054655; _gid=GA1.2.557164000.1707054655; cb-enabled=enabled; cf_clearance=6tK6.WKHJbXXoV4NTgbyHRhetRxMdWPZofwlv01F65Y-1707054656-1-AfrYlWnLLZFC1sKxeFVQintPrZnjvjoJSZwRRhAYwqRHGdWbU5IFZQDJZJM21l20Tj6gk4JxNobWT0wGzp1Dgjw=; _ce.irv=new; cebs=1; _ce.clock_event=1; _ce.clock_data=72%2C123.149.3.159%2C1%2C9c1ce27f08b16479d2e17743062b28ed; custom_cookie_AB=1; AWSALB=I/eGQ0glcxuROskD1JKEl/dqsqElpmo/MnwLboJZJB2QthQFFWnLA3gzuJTskEaZxJD7VuWEEsqjhLVvhq4q2Wt0RebuRhukeHpKvgmGMelxpn/RiDmehyvxTOiS; AWSALBCORS=I/eGQ0glcxuROskD1JKEl/dqsqElpmo/MnwLboJZJB2QthQFFWnLA3gzuJTskEaZxJD7VuWEEsqjhLVvhq4q2Wt0RebuRhukeHpKvgmGMelxpn/RiDmehyvxTOiS; _gat_UA-1887794-2=1; _dc_gtm_UA-136634323-1=1; _ga_F5XZ540Q4V=GS1.1.1707054655.1.1.1707055119.7.0.0; _ga=GA1.1.1014316406.1707054655; _ga_F7KSNTXTRX=GS1.1.1707054655.1.1.1707055119.0.0.0; cebsp_=5; _ce.s=v~212f033193b9432855ae8335d6d3969cc1f8b751~lcw~1707055134688~lva~1707054658247~vpv~0~v11.fhb~1707054659602~v11.lhb~1707055126493~v11.cs~325107~v11.s~6d7ba630-c364-11ee-aba8-136dbbf9a447~v11.sla~1707055134688~v11.send~1707055135439~lcw~1707055135439',
'Referer': 'https://www.oecd-ilibrary.org/economics/oecd-policy-responses-on-the-impacts-of-the-war-in-ukraine_dc825602-en?page=2',
'Sec-Ch-Ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
req = requests.get(url=url, headers=headers)
soup = BeautifulSoup(req.content, 'html.parser')
div_part = soup.find_all('div', class_='col-xs-12 body-section')[1]
div_list = div_part.find_all('div', class_='row panel')
for div in div_list:
start_time = time.time()
title = div.find('div', class_='col-lg-7 col-xs-12 resume-item').find('p', class_='intro-item').find('strong', class_='book-title').text
href = 'https://www.oecd-ilibrary.org' + div.find('div', class_='col-lg-7 col-xs-12 resume-item').find('p', class_='intro-item').find('a')['href']
is_href = db_storage.find_one({'网址': href})
if is_href:
log.info(f'{href}===已采集')
continue
pubtime_ = div.find('div', class_='col-lg-7 col-xs-12 resume-item').find('p', class_='intro-item').find('strong', class_='book-title gray').text
# 定义原始时间的格式
time_format = "%d %b %Y"
# 转换为标准时间
standard_time = datetime.strptime(pubtime_, time_format).strftime("%Y-%m-%d")
if standard_time > '2023-01-30':
pass
else:
break
year = standard_time[:4]
pdf_part = div.find('div', class_='col-lg-5 col-xs-12 actions-item').find('ul', class_='actions').find_all('li')[1].find('a').get('href')
pdf_url = 'https://www.oecd-ilibrary.org' + pdf_part
req_news = requests.get(url=href, headers=headers)
soup_news = BeautifulSoup(req_news.content, 'html.parser')
# print(title, standard_time, pdf_url, href)
contentWithTag = soup_news.find('div', class_='description js-desc-fade show-all')
content = contentWithTag.get_text()
# todo:翻译
try:
titleRaw, contentWithTagRaw = translate(str(title), str(contentWithTag))
log.info(f'{href}===翻译成功')
except Exception as e:
log.error(f'{href}===翻译失败==={e}')
continue
retData = baseCore.uptoOBS(pdf_url, title, 15, '', pathType, taskType, start_time, create_by)
num += 1
id_list = []
if retData['state']:
att_id = tableUpdate(retData, title, num, standard_time, '经济合作与发展组织')
if att_id:
id_list.append(att_id)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
lang = baseCore.detect_language(content)
contentRaw = contentWithTagRaw.text
contentWithTagRaw = str(contentWithTagRaw)
dic = {
'id': f'1620244462491893761{int(time.time())}',
'subjectId': '1620244462491893761',
'checkStatus': 1,
'deleteFlag': 0,
'topNum': 0,
'content': content,
'contentRaw': contentRaw,
'contentWithTag': str(contentWithTag),
'contentWithTagRaw': contentWithTagRaw,
'createDate': now,
'labels': [
{'labelMark': 'organization', 'relationId': '1619903523269271554', 'relationName': '经济合作与发展组织'}],
'lang': lang,
'origin': '经济合作与发展组织',
'publishDate': standard_time,
'sourceAddress': href,
'title': title,
'titleRaw': titleRaw,
'updateDate': now,
'attachmentIds':id_list
}
sendKafka(dic)
try:
save_data(dic)
except:
log.error(f'{href}===数据库保存失败')
# break
if __name__ == "__main__":
pathType = 'PolicyDocuments/'
taskType = '国外智库-经合组织'
create_by = 'XueLingKun'
doJob()
......@@ -119,17 +119,17 @@ if __name__=='__main__':
# or '中共' in author or '记者' in author or '新闻社' in author\
# or '党委' in author or '调研组' in author or '研究中心' in author\
# or '委员会' in author or '博物' in author or '大学' in author or '联合会' in author :
# if '(' in author or '本刊' in author \
# or '记者' in author or '新闻社' in author \
# or '”' in author\
# or '大学' in author or '洛桑江村' in author:
# continue
if '国资委党委' in author:
pass
else:
if '(' in author or '本刊' in author \
or '记者' in author or '新闻社' in author \
or '”' in author\
or '大学' in author or '洛桑江村' in author:
continue
# if '国资委党委' in author:
# pass
# else:
# continue
new_href = new.find('a')['href']
is_member = r.sismember('qiushileaderspeech_two::' + period_title, new_href)
is_member = r.sismember('qiushileaderspeech::' + period_title, new_href)
if is_member:
continue
new_title = new.find('a').text.replace('\u3000',' ').lstrip(' ').replace('——', '').replace('\xa0', '')
......@@ -165,7 +165,7 @@ if __name__=='__main__':
}
log.info(dic_news)
if sendKafka(dic_news):
r.sadd('qiushileaderspeech_two::' + period_title, new_href)
r.sadd('qiushileaderspeech::' + period_title, new_href)
log.info(f'采集成功----{dic_news["sourceAddress"]}')
import csv
import time
import pandas as pd
import redis
import requests
from bs4 import BeautifulSoup
from retry import retry
from selenium.common import StaleElementReferenceException
from base import BaseCore
from requests.packages import urllib3
from selenium.webdriver.common.by import By
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
urllib3.disable_warnings()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
......@@ -33,54 +40,100 @@ headers = {
'sec-ch-ua-platform': '"Windows"',
}
# todo:使用模拟浏览器
def create_driver():
path = r'D:\soft\msedgedriver.exe'
@retry(tries=2, delay=5)
def getHref(Keywords):
data = {
'Menu': 'law',
'Keywords': Keywords,
'PreKeywords': Keywords,
'SearchKeywordType': 'Title',
'MatchType': 'Exact',
'RangeType': 'Piece',
'Library': 'chl',
'ClassFlag': 'chl',
'GroupLibraries': '',
'QuerySearchCondition': 'Title+Exact+Piece+0',
'QueryOnClick': False,
'AfterSearch': True,
'RequestFrom': 'btnSearch',
'SearchInResult': '',
'PreviousLib': 'chl',
'IsSynonymSearch': 'false',
'RecordShowType': 'List',
'ClassCodeKey': ',,,,,,',
'IsSearchErrorKeyword': '',
'FirstQueryKeywords': Keywords,
'FirstQueryKeywordType': 'Title',
'IsSynonymSearch': 'false',
'X-Requested-With': 'XMLHttpRequest',
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
ip = baseCore.get_proxy()
url = 'https://sclx.pkulaw.com/law/chl'
req = requests.get(url, headers=headers, data=data, proxies=ip, verify=False)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
driver = webdriver.Edge(executable_path=path, capabilities=options)
return driver
@retry(tries=2, delay=5)
def getHref(Keywords, driver):
# data = {
# 'Menu': 'law',
# 'Keywords': Keywords,
# 'PreKeywords': Keywords,
# 'SearchKeywordType': 'Title',
# 'MatchType': 'Exact',
# 'RangeType': 'Piece',
# 'Library': 'chl',
# 'ClassFlag': 'chl',
# 'GroupLibraries': '',
# 'QuerySearchCondition': 'Title+Exact+Piece+0',
# 'QueryOnClick': False,
# 'AfterSearch': True,
# 'RequestFrom': 'btnSearch',
# 'SearchInResult': '',
# 'PreviousLib': 'chl',
# 'IsSynonymSearch': 'false',
# 'RecordShowType': 'List',
# 'ClassCodeKey': ',,,,,,',
# 'IsSearchErrorKeyword': '',
# 'FirstQueryKeywords': Keywords,
# 'FirstQueryKeywordType': 'Title',
# 'IsSynonymSearch': 'false',
# 'X-Requested-With': 'XMLHttpRequest',
# }
driver.get('https://sclx.pkulaw.com/law')
# ip = baseCore.get_proxy()
driver.find_element(By.ID, 'txtSearch').send_keys(Keywords)
time.sleep(0.5)
driver.find_element(By.CLASS_NAME, 'btn-search').click()
wait = WebDriverWait(driver, 30)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "accompanying-wrap")))
getpart = driver.find_element(By.CLASS_NAME, 'accompanying-wrap')
# li_list = getpart.find_elements(By.TAG_NAME, 'li')
# for li in li_list:
driver.execute_script("arguments[0].scrollIntoView();", getpart)
time.sleep(2)
try:
tag = soup.find('div', class_='accompanying-wrap').find('div', class_='item').find('li', attrs={
'name': 'HistoryAssociation'})
href = 'https://sclx.pkulaw.com' + tag.get('url')
except:
href = ''
time.sleep(1)
element = getpart.find_element(By.XPATH, ".//div/div[1]/div[3]/div/div[1]/ul/li[@name='HistoryAssociation']")
time.sleep(1)
driver.execute_script("arguments[0].scrollIntoView();", element)
time.sleep(1)
element.click()
href = 'https://sclx.pkulaw.com' + element.get_attribute("url")
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "a-tab-col")))
info_part = driver.find_element(By.CLASS_NAME, 'a-tab-col').find_element(By.XPATH, './/div[@name="HistoryAssociation"]')
# except Exception as e:
except StaleElementReferenceException:
# 元素已经stale,重新定位元素
element = driver.find_element(By.XPATH, ".//div/div[1]/div[3]/div/div[1]/ul/li[@name='HistoryAssociation']")
element.click() # 再次尝试与元素交互
href = 'https://sclx.pkulaw.com' + element.get_attribute("url")
# log.info(e)
# href = ''
return href
# url = 'https://sclx.pkulaw.com/law/chl'
# req = requests.post(url, headers=headers, data=data, proxies=ip)
# req = requests.post(url, headers=headers, data=data, verify=False)
# req.encoding = req.apparent_encoding
# soup = BeautifulSoup(req.text, 'html.parser')
# try:
# tag = soup.find('div', class_='accompanying-wrap').find('div', class_='item').find('li', attrs={
# 'name': 'HistoryAssociation'})
# href = 'https://sclx.pkulaw.com' + tag.get('url')
# except:
# href = ''
# return href
@retry(tries=2, delay=5)
def getData(href):
data = []
ip = baseCore.get_proxy()
req = requests.get(href, headers=headers, proxies=ip, verify=False)
@retry(tries=3, delay=5)
def getData(href, Keywords):
term = Keywords
# ip = baseCore.get_proxy()
# req = requests.get(href, headers=headers, proxies=ip)
req = requests.get(href, headers=headers)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
li_list = soup.find_all('li')
......@@ -90,22 +143,59 @@ def getData(href):
theme = li.find('div', class_='theme').text.strip()
except:
theme = ''
try:
relevance = li.find('div', class_='relevance').text.strip()
except:
relevance = ''
data.append([publishDate,theme,relevance])
time.sleep(1)
return data
# try:
# relevance = li.find('div', class_='relevance').text.strip()
# except:
# relevance = ''
# log.info(f'{publishDate}==={theme}==')
term += ',' + theme + '_' + publishDate
log.info(term)
if ',' not in term or '_' not in term:
r.rpush('ShenjisclxError:', Keywords)
return None
return term
def doJob():
data = []
Keywords = '中华人民共和国公司法(2023修订)'
href = getHref(Keywords)
if href:
data += getData(href)
df = pd.DataFrame(data)
print(df)
data_list = []
driver = create_driver()
driver.maximize_window()
while True:
try:
Keywords = r.lpop('Shenjisclx:').decode()
# Keywords = '中华人民共和国银行业监督管理法(2006修正)'
except:
Keywords = ''
if Keywords:
try:
href = getHref(Keywords, driver)
if href:
r.rpush('ShenjisclxHref:', f'{Keywords}|{href}')
log.info(f'{Keywords}====找到=== {href}')
term = getData(href, Keywords)
else:
term = Keywords + ','
r.rpush('ShenjisclxHrefNull:', f'{Keywords}|{href}')
log.info(f'{Keywords}====未找到')
if term:
# data_list.append(term)
r.rpush('ShenjisclxReault:', term)
except:
r.rpush('ShenjisclxError:', Keywords)
continue
time.sleep(2)
else:
break
# print(data_list)
# with open('./output.csv', 'w', newline='') as file:
# writer = csv.writer(file)
#
# # 写入数据
# for row in data_list:
# writer.writerow(row.split(','))
#
# print('数据已成功写入CSV文件')
if __name__ == '__main__':
doJob()
......
import csv
# 要写入的数据
# data = [
# ['Name', 'Age', 'City'],
# ['Alice', 25, 'New York'],
# ['Bob', 30, 'Los Angeles'],
# ['Charlie', 35, 'Chicago']
# ]
data = ['aaaa,bbbb,cccc', 'aaaa,cccc,ffff']
# 打开CSV文件进行写入
with open('./output.csv', 'w', newline='') as file:
writer = csv.writer(file)
# 写入数据
for row in data:
writer.writerow(row.split(','))
print('数据已成功写入CSV文件')
import csv
import redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
if __name__ == "__main__":
with open('./output0220_1.csv', 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
while True:
try:
term_ = r.lpop('ShenjisclxError:').decode()
term = str(term_) + ','
except:
term = ''
if term == '':
break
else:
# 写入数据
writer.writerow(str(term).split(','))
......@@ -170,5 +170,71 @@ for data in datas:
# f.write(dic_info_)
# break
# req = requests.post('http://192.168.1.236:5000/translate',data=dic_info_,headers=headers)
req = requests.post('http://117.78.23.14:5001/translate',data=dic_info_,headers=headers)
log.info(req.text)
\ No newline at end of file
req = requests.post('http://117.78.23.14:5000/translate',data=dic_info_,headers=headers)
log.info(req.text)
# import re, datetime
#
#
# def paserTime(publishtime):
# timeType = ['年前', '月前', '周前', '前天', '昨天', '天前', '今天', '小时前', '分钟前']
# current_datetime = datetime.datetime.now()
# publishtime = publishtime.strip()
# print(publishtime)
#
# try:
# if '年前' in publishtime:
# numbers = re.findall(r'\d+', publishtime)
# day = int(numbers[0])
# delta = datetime.timedelta(days=365 * day)
# publishtime = current_datetime - delta
# elif '月前' in publishtime:
# numbers = re.findall(r'\d+', publishtime)
# day = int(numbers[0])
# delta = datetime.timedelta(months=day)
# publishtime = current_datetime - delta
# elif '周前' in publishtime:
# numbers = re.findall(r'\d+', publishtime)
# day = int(numbers[0])
# delta = datetime.timedelta(weeks=day)
# publishtime = current_datetime - delta
# elif '天前' in publishtime:
# numbers = re.findall(r'\d+', publishtime)
# day = int(numbers[0])
# delta = datetime.timedelta(days=day)
# publishtime = current_datetime - delta
# elif '前天' in publishtime:
# delta = datetime.timedelta(days=2)
# publishtime = current_datetime - delta
# elif '昨天' in publishtime:
# current_datetime = datetime.datetime.now()
# delta = datetime.timedelta(days=1)
# publishtime = current_datetime - delta
# elif '今天' in publishtime or '小时前' in publishtime or '分钟前' in publishtime:
# if '小时' in publishtime:
# hour = publishtime.split("小时")[0]
# else:
# hour = 0
# if hour != 0:
# min = publishtime.split("小时")[1].split("分钟")[0]
# else:
# min = publishtime.split("分钟")[0]
#
# delta = datetime.timedelta(hours=int(hour), minutes=int(min))
# publishtime = current_datetime - delta
# elif '年' in publishtime and '月' in publishtime:
# time_format = '%Y年%m月%d日'
# publishtime = datetime.datetime.strptime(publishtime, time_format)
# elif '月' in publishtime and '日' in publishtime:
# current_year = current_datetime.year
# time_format = '%Y年%m月%d日'
# publishtime = str(current_year) + '年' + publishtime
# publishtime = datetime.datetime.strptime(publishtime, time_format)
# except Exception as e:
# print('时间解析异常!!')
# return publishtime
#
# if __name__ == "__main__":
# publishtime_ = '1小时17分钟前'
# publish_time = paserTime(publishtime_).strftime("%Y-%m-%d")
# print(publish_time)
\ No newline at end of file
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -59,12 +59,13 @@ def newsdata(art_content_dict,art_type_dict,dic_lables):
try:
del post_dict['is_repeat']
del post_dict['tags']
del post_dict['title_pd']
# 发送kafka
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
kafka_result = producer.send("research_center_fourth",
json.dumps(post_dict, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
# kafka_result = producer.send("research_center_fourth",
# json.dumps(post_dict, ensure_ascii=False).encode('utf8'))
#
# print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
......@@ -122,21 +123,22 @@ def get_content():
except:
print('请求错误1')
continue
for data_dict in data_list[::-1]:
# for data_dict in data_list[::-1]:
for data_dict in data_list[:1]:
article_id = data_dict['article_id']
print(type(article_id))
is_article_id = db_storage.find_one({'id': f"1534423014825668610{article_id}"})
if is_article_id:
continue
title = data_dict['title']
# is_article_id = db_storage.find_one({'id': f"1534423014825668610{article_id}"})
# if is_article_id:
# continue
title = data_dict['title'] # 采集到的标题
pub_time = data_dict['input_date']
current_date = datetime.now()
yesterday = current_date - timedelta(days=1)
# 格式化日期
yesterday_date = yesterday.strftime("%Y-%m-%d")
if pub_time <= yesterday_date:
continue
title_dict_list = db_storage.find({'title': title, 'is_repeat': ''})
# if pub_time <= yesterday_date:
# continue
title_dict_list = db_storage.find({'title_pd': title.replace(' ', ''), 'is_repeat': ''}) # 如果找到一样的标题 判断三天之内是否有重复的
is_repeat = ''
for title_dict in title_dict_list:
pub_time1 = title_dict['publishDate']
......@@ -152,6 +154,14 @@ def get_content():
doc_href = pq(href_text)
content_html1 = str(doc_href('.d2txt_con.clearfix'))
content_html2 = str(doc_href('.editor.clearfix'))
#rtodo: 找到标题并拼接
title1 = doc_href('.d2txt.clearfix h2').text()
title2 = doc_href('.d2txt.clearfix h1').text()
title3 = doc_href('.d2txt.clearfix h3').text()
if title1 == '' and title3 == '':
title_final = title
else:
title_final = title1 + ' ' + title2 + ' ' + title3
except:
print('请求错误2')
continue
......@@ -170,7 +180,8 @@ def get_content():
origin = data_dict['origin_name']
a_dict = {
'id': "1534423014825668610" + article_id,
'title': title,
'title': title_final,
'title_pd': title,
'author': '',
'origin': origin,
'contentWithTag': content_html,
......@@ -183,6 +194,7 @@ def get_content():
}
art_content_dict[article_id] = a_dict
db_a_dict = a_dict.copy()
db_a_dict['title_pd'] = title.replace(' ', '')
db_storage.insert_one(db_a_dict)
if is_repeat == '':
print(href)
......
"""
"""
从es中拿到所有的标题
"""
import redis
from elasticsearch import Elasticsearch
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'subjectdatabase'
def queryatt(self,index_name,pnum):
body = {
"query": {
"match": {
"subjectId": "1534423014825668610"
}
},
"sort": [
{
"publishDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200,
"from": pnum
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.title',
'hits.hits._source.origin',
'hits.hits._source.publishDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
if __name__ == '__main__':
es_method = EsMethod()
# 连接Redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
for i in range(56):
result = es_method.queryatt('subjectdatabase', i*200)
total = result['hits']['total']['value']
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
continue
log.info(f'---第{i}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
origin = mms['_source']['origin']
pub_time = mms['_source']['publishDate']
try:
log.info(f'{id}--{title}--{origin}--')
item = id + "|" + pub_time
# r.lrem(f'XJPdatabase:id_2', 0, item)
r.lpush(f'XJPdatabase:id', item)
except:
continue
"""
"""
对标题进行操作
1.有空格的去掉空格
2.精确去重
3.杰卡德相似度去重
"""
#将数据读到csv中
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
df = pd.read_excel('./test2.xlsx')
print(df)
# 去掉空格
df['title_1'] = df['title'].str.replace(' ', '')
print(df['title_1'])
#精确去重
# df_drop = df.drop_duplicates(subset=['title'], keep='first')
# duplicates = df[df.duplicated('title_1', keep=False)]['title_1']
#杰卡德相似度去重
# from sklearn.feature_extraction.text import TfidfVectorizer
# vectorizer = TfidfVectorizer(analyzer='word',ngram_range=(1, 2),max_features=1000)
# tfidf_matrix = vectorizer.fit_transform(df['title'])
#
# dist = 1 - cosine_similarity(tfidf_matrix)
#
# df['similar'] = dist.mean(axis=1)
#
# df_drop = df.drop_duplicates(subset=['title'],keep='last')
# df_drop.to_csv('D:/data/titles_drop.csv',index=False)
\ No newline at end of file
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -163,9 +163,8 @@ class BaiduTaskJob(object):
return kwList
def runSpider(self,kwmsg):
searchkw=kwmsg['kw']
def runSpider(self,kwmsg,com_name):
searchkw=com_name + kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
baiduSpider=BaiduSpider(searchkw,wordsCode,sid)
......@@ -186,7 +185,15 @@ class BaiduTaskJob(object):
finally:
baiduSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw)
import random
def get_comname(self):
# todo:读取redis里的企业名称添加到关键词上
com_name = baseCore.redicPullData('SousuoBaidu:companyname')
if com_name:
return com_name
else:
logger.info('====已无企业===')
return None
def PutWords(codeList, r):
......@@ -208,50 +215,17 @@ if __name__ == '__main__':
baseCore=BaseCore()
logger=baseCore.getLogger()
# ss='(中国机床工具工业协会|中国内燃机工业协会|中国机电工业价格协会|中国机械电子兵器船舶工业档案学会|中国仪器仪表行业协会|中国工程机械工业协会|中国文化办公设备制造行业协会|中国机械工业金属切削刀具技术协会|中国机械工业教育协会|中国汽车工业协会|中国机械通用零部件工业协会|中国环保机械行业协会|中国模具工业协会|中国机械工业勘察设计协会|中国机械制造工艺协会|中国机械工业审计学会|中国轴承工业协会|中国机电一体化技术应用协会|中国机械工程学会|中国液压气动密封件工业协会|中国铸造协会|中国通用机械工业协会|中国锻压协会|中国制冷空调工业协会|中国热处理行业协会|中国电工技术学会|中国仪器仪表学会|中国石油和石油化工设备工业协会|中国表面工程协会|中国食品和包装机械工业协会|中国焊接协会|中国汽车工程学会|中国塑料机械工业协会|中国机械工业企业管理协会|中国印刷及设备器材工业协会|中国机械工业质量管理协会|中国电器工业协会|中国机械工业安全卫生协会|中国重型机械工业协会|中国机械工业标准化技术协会|中国机械工业职工思想政治工作研究会|中国农业机械工业协会|中国机电装备维修与改造技术协会 |机械工业信息研究院|机械工业教育发展中心|机械工业经济管理研究院|机械工业信息中心|机械工业人才开发服务中心|机械工业北京电工技术经济研究所|机械工业技术发展基金会|机械工业哈尔滨焊接技术培训中心|机械工业仪器仪表综合技术经济研究所)+(私收会费|私吞|肆意牟利|损失浪费|索贿|贪财|贪官污吏|贪污|违背组织原则|违法|违纪|为官不廉|为政擅权|窝案|舞弊|泄露国家机密|信鬼神|性关系|虚假信息|虚假招标|隐瞒不报|隐瞒真相|营私|鬻爵|主动投案|资产流失|钻空子|钻漏洞|被调查|被双开|不担当|不老实|不良影响|不正当|不作为|超标准建设|超标准装修|吃空饷|吃拿卡要|渎职|对党不忠诚|非法批地|腐败|腐虫|腐化堕落|公车私用|公费开销|公款吃喝|公款出境|公款旅游|勾结|官迷心窍|好色|回扣|贿赂|挤占挪用|纪律审查|监察调查|监守自盗|践踏法律|接受审查调查|截留克扣|开除党籍|开除公职|抗议|利欲熏心|敛财|乱摊派|乱作为|落马|落网|买官|买卖审批权限|卖官|谋取暴利|谋取私利|目无法纪|幕后交易|弄虚作假|挪用公款|骗取|钱色交易|潜规则|侵害权益|侵吞公款|侵占挪用|圈子文化|权利扭曲|权钱交易|权色交易|山头主义|涉案|生活糜烂|生活奢靡|失察|失管|收送|受贿|双规|双开|私分|私人会所|私设小金库|负面|下降|违规|不利|亏损|上诉|不法|不良名单|停职|公开谴责|公诉|内幕交易|刑事拘留|刑事责任|刑拘|判决|判刑|判赔|司法处置|合同纠纷|处分|处罚|强制执行|仲裁|伪造|伪造公章|投案|投诉|拘留|接受调查|控诉|查封|涉嫌|涉诉监察调查|纠纷|经营异常名录|缉捕|罚单|罚款|罚金|罪犯|自首|获刑|行贿|警示函|贪腐|违约金|追究刑责|造假|逮捕|非法|非法集资判决书|申诉|纠纷|通报|开除|留党察看|追债|逃债|资产负债率|情色交易|搞权钱|曝光|黑料|重罚|虚假报告|侵犯)'
# keymsglist=baiduTaskJob.getkeywords(ss)
# print(keymsglist)
# 创建Redis连接
# r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
# codeList = [
# 'KW-20220809-0004',
# 'KW-20220524-0004',
# 'KW-20220809-0005',
# 'KW-20220824-0001',
# 'KW-20220809-0002',
# 'KW-20220809-0003',
# 'KW-20220826-0001',
# 'KW-20220602-0003',
# 'KW-20220602-0002',
# 'KW-20220113-0007',
# 'KW-20220113-0006',
# 'KW-20220108-0004',
# 'KW-20220113-0004'
# ]
# PutWords(codeList, r)
while True:
try:
# codeid = redicPullData("BaiduSearch:WordsCode", r)
# if codeid:
# pass
# else:
# PutWords(codeList, r)
# #codeList.append('KW-20220108-0004')
# logger.info(f'开始采集{codeid}')
com_name = baiduTaskJob.get_comname()
if com_name:
pass
else:
break
codeList = [
# 'KW-20220809-0004',
# 'KW-20220524-0004',
# 'KW-20220809-0005',
# 'KW-20220824-0001',
# 'KW-20220809-0002',
# 'KW-20220809-0003',
'KW-20220826-0001',
# 'KW-20220602-0003',
# 'KW-20220602-0002',
# 'KW-20220113-0007',
# 'KW-20220113-0006',
# 'KW-20220108-0004',
# 'KW-20220113-0004'
'KW-20240206-0001',
'KW-20240206-0002',
'KW-20240206-0003'
]
for codeid in codeList:
try:
......@@ -271,7 +245,7 @@ if __name__ == '__main__':
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(baiduTaskJob.runSpider, data) for data in kwList]
results = [executor.submit(baiduTaskJob.runSpider, data,com_name) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -7,6 +7,7 @@ import logbook
import logbook.more
# 核心工具包
import pymysql
import redis
from tqdm import tqdm
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore:
......@@ -215,6 +216,8 @@ class BaseCore:
except :
pass
def __init__(self):
# 连接到Redis
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
......@@ -288,65 +291,11 @@ class BaseCore:
def getRandomUserAgent(self):
return random.choice(self.__USER_AGENT_LIST)
# 获取代理
def get_proxy(self):
sql = "select proxy from clb_proxy"
self.__cursor_proxy.execute(sql)
proxy_lists = self.__cursor_proxy.fetchall()
self.__cnx_proxy.commit()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"http": proxyMeta,
"https": proxyMeta
}
proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)]
def get_proxy(self):
ip_list = []
with self.__cursor_proxy as cursor:
sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12))
print(sql_str)
cursor.execute(sql_str)
rows = cursor.fetchall()
for row in tqdm(rows):
str_ip = row[0]
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
ip_list.append(proxy)
return ip_list
def get_proxyIPPort(self):
ip_list = []
with self.__cursor_proxy as cursor:
sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12))
print(sql_str)
cursor.execute(sql_str)
rows = cursor.fetchall()
for row in tqdm(rows):
str_ip = row[0]
str_ip_list = str_ip.split('-')
proxy = {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
ip_list.append(proxy)
return ip_list
\ No newline at end of file
# 从Redis的List中获取并移除一个元素
def redicPullData(self, key):
try:
self.r.ping()
except:
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
item = self.r.lpop(key)
return item.decode() if item else None
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论