提交 5940b41f 作者: 薛凌堃

11/28

上级 e00e2f5b
# 核心工具包 # REITs专题核心工具包
# 核心工具包 # REITs专题核心工具包
import json
import os import os
import random import random
import socket import socket
...@@ -19,7 +20,7 @@ import pymysql ...@@ -19,7 +20,7 @@ import pymysql
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
# import sys # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
from kafka import KafkaProducer
from obs import ObsClient from obs import ObsClient
import fitz import fitz
...@@ -450,8 +451,10 @@ class BaseCore: ...@@ -450,8 +451,10 @@ class BaseCore:
# def doc_page(self,file_path): # def doc_page(self,file_path):
# doc = Document(file_path) # doc = Document(file_path)
# return len(doc.sections) # return len(doc.sections)
def deliteATT(self,id):
delitesql = f"delete from clb_sys_attachment where id = '{id}' "
self.cursor_.execute(delitesql)
self.cnx_.commit()
def secrchATT(self,item_id,file_name,type_id,order_by): def secrchATT(self,item_id,file_name,type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s '''
...@@ -549,6 +552,24 @@ class BaseCore: ...@@ -549,6 +552,24 @@ class BaseCore:
return retData return retData
def sendkafka(self, post_data, topic):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send(topic, json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
self.getLogger().info(dic_result)
return True
except:
return False
......
import os import os
...@@ -19,7 +19,7 @@ import BaseCore ...@@ -19,7 +19,7 @@ import BaseCore
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
filepath = "data/" filepath = "data/"
topic = 'policy'
class Policy(): class Policy():
def getrequest_soup(self,headers,url): def getrequest_soup(self,headers,url):
req = requests.get(headers=headers,url=url) req = requests.get(headers=headers,url=url)
...@@ -275,6 +275,9 @@ def reform(wb,file_path): ...@@ -275,6 +275,9 @@ def reform(wb,file_path):
'附件链接': fu_jian_href, '附件链接': fu_jian_href,
} }
DataList.append(dic_info) DataList.append(dic_info)
try:
baseCore.sendkafka(dic_info, topic)
except:
sheet_name = "国家发展和改革委员会" sheet_name = "国家发展和改革委员会"
if sheet_name in wb.sheetnames: if sheet_name in wb.sheetnames:
......
...@@ -43,7 +43,7 @@ class EsMethod(object): ...@@ -43,7 +43,7 @@ class EsMethod(object):
"must": [ "must": [
{ {
"match": { "match": {
"type": "0" "type": "1"
} }
} }
] ]
...@@ -115,7 +115,7 @@ def main(page, p, esMethod): ...@@ -115,7 +115,7 @@ def main(page, p, esMethod):
attid = mms['_source']['attachmentIds'][0] attid = mms['_source']['attachmentIds'][0]
log.info(f'{id}-{attid}--{title}--{sourceAddress}---') log.info(f'{id}-{attid}--{title}--{sourceAddress}---')
selects = secrchATT('4', attid) selects = secrchATT('1', attid)
if selects: if selects:
pass pass
else: else:
......
# coding:utf-8
import time
import urllib3
import BaseCore
from elasticsearch import Elasticsearch
import threading
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
es = Elasticsearch([{'host': '114.115.215.250', 'port': '9700'}], http_auth=('elastic', 'zzsn9988'), timeout=600)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
r = baseCore.r
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_11 = baseCore.cnx_
cursor_11 = baseCore.cursor_
def getList():
sql = 'Select id,item_id,year from clb_sys_attachment where type_id = 1'
cursor_11.execute(sql)
datas = cursor_11.fetchall()
total = len(datas)
page = total // 5
datas_lists = [list(datas[i:i + page]) for i in range(0, len(datas), page)]
return list(datas_lists)
def process_item(datas_list):
for datas in datas_list:
sid = datas[0]
xydm = datas[1]
year = datas[2]
if not xydm or xydm == 'None':
log.error(f'{sid}===没有信用代码')
return
if not year or year == 'None':
log.error(f'{sid}===没有年份')
return
body = {
"query": {
"bool": {
"must": [
{
"nested": {
"path": "labels",
"query": {
"match": {
"labels.relationId": f"{xydm}"
}
}
}
},
{
"term": {
"type.keyword": {
"value": "1"
}
}
},
{
"term": {
"year.keyword": {
"value": f"{year}"
}
}
}
]
}
},
"sort": [
{
"publishDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 10
}
res = es.search(index='researchreportdata', body=body)
if len(res['hits']['hits']) == 0:
log.error(f'{xydm}==={year}===未查询到')
for hit in res['hits']['hits']:
try:
sid_ = hit['_source']['attachmentIds'][0]
except:
log.error(f'{xydm}==={year}===es中未查询到附件id')
return
if str(sid) in str(sid_):
log.info(f'{xydm}==={year}===查询到')
else:
log.error(f'{xydm}==={year}===未查询到,查询到其它sid为{sid_}')
time.sleep(2)
threads = []
datas_lists = getList()
for datas_list in datas_lists:
t = threading.Thread(target=process_item, args=(datas_list,))
threads.append(t)
t.start()
# 等待所有线程执行完毕
for t in threads:
t.join()
es.transport.close()
cursor_11.close()
cnx_11.close()
baseCore.close()
import os import os
...@@ -235,10 +235,12 @@ def spider(browser, code, social_code, com_name): ...@@ -235,10 +235,12 @@ def spider(browser, code, social_code, com_name):
year = publishDate[:4] year = publishDate[:4]
newsUrl = 'https://np-info.eastmoney.com/pc/notice/?art_code=' + li.find('a')['data-code'] newsUrl = 'https://np-info.eastmoney.com/pc/notice/?art_code=' + li.find('a')['data-code']
title = li.find('a').text title = li.find('a').text
if ifInstert(com_name, social_code, title): if ifInstert(com_name, social_code, newsUrl):
pass pass
else: else:
continue continue
time.sleep(1)
browser2 = createDriver() browser2 = createDriver()
browser2.get(newsUrl) browser2.get(newsUrl)
wait = WebDriverWait(browser2, 30) wait = WebDriverWait(browser2, 30)
...@@ -247,7 +249,8 @@ def spider(browser, code, social_code, com_name): ...@@ -247,7 +249,8 @@ def spider(browser, code, social_code, com_name):
soup_news = BeautifulSoup(page_source, 'html.parser') soup_news = BeautifulSoup(page_source, 'html.parser')
contentWithTag = soup_news.find('div', id='render-html') contentWithTag = soup_news.find('div', id='render-html')
content = contentWithTag.text content = contentWithTag.text
if len(content) < 10:
continue
# 判断有无附件 # 判断有无附件
try: try:
browser2.find_element(By.CLASS_NAME, 'download-list').click() browser2.find_element(By.CLASS_NAME, 'download-list').click()
...@@ -374,7 +377,7 @@ if __name__ =='__main__': ...@@ -374,7 +377,7 @@ if __name__ =='__main__':
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterprise:ggqy_socialCode_add') # social_code = baseCore.redicPullData('NoticeEnterprise:ggqy_socialCode_add')
social_code = 'ZZSN23030800000022' social_code = '91330000747735638J'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -393,6 +396,7 @@ if __name__ =='__main__': ...@@ -393,6 +396,7 @@ if __name__ =='__main__':
gonggao_info(dic_info) gonggao_info(dic_info)
except: except:
log.info(f'-----error:{com_name}----{social_code}------') log.info(f'-----error:{com_name}----{social_code}------')
break
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论