提交 639117a7 作者: 薛凌堃

11.30

上级 3fdea62f
# REITs专题核心工具包
# REITs专题核心工具包
......@@ -522,7 +522,7 @@ class BaseCore:
response = requests.get(file_href, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
except Exception as e:
time.sleep(3)
continue
for i in range(0, 3):
......@@ -538,7 +538,10 @@ class BaseCore:
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = self.convert_size(file_size)
try:
retData['file_size'] = self.convert_size(file_size)
except:
retData['file_size'] = ''
retData['create_time'] = time_now
except Exception as e:
print(f'error:{e}')
......@@ -546,20 +549,19 @@ class BaseCore:
return retData
def sendkafka(self, post_data, topic):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send(topic, json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
self.getLogger().info(dic_result)
return True
except:
return False
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send(topic, json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
self.getLogger().info(dic_result)
return True
......
import os
import os
import os
import time
from datetime import datetime
from urllib.parse import urljoin
......@@ -6,10 +7,21 @@ import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from base import BaseCore
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
from reits import Policy
policy = Policy()
topic = 'policy'
webname = '天津市人民政府'
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
......@@ -75,9 +87,8 @@ def getFjContent(url):
return req.content
def getContent(num, title, pub_time, origin, organ, url, pub_hao, summary, ):
fjhref_list = ''
fjtitle_list = ''
def getContent(num, title, pub_time, origin, organ, url, pub_hao, summary):
id_list = []
soup = getSoup(url)
url_ = url.split('/')[-1]
soup = paserUrl(soup, url.replace(url_, ''))
......@@ -102,55 +113,84 @@ def getContent(num, title, pub_time, origin, organ, url, pub_hao, summary, ):
a_list = contentWithTag.find('div', class_='qt-attachments').find_all('a')
for a in a_list:
href = a.get('href')
fjhref_list += href + '\n'
category = os.path.splitext(href)[1]
fj_title = f'{num}-{pub_time}-{a.text.lstrip().strip()}'
fj_title = a.text.lstrip().strip()
if '<' in fj_title or '>' in fj_title:
fj_title = fj_title.replace('<', '').replace('>', '')
if category not in fj_title:
fj_title = fj_title + category
fjtitle_list += fj_title + '\n'
fjcontent = getFjContent(href)
file = f'./相关政策/天津市人民政府/政策文件/{fj_title}'
with open(file, 'wb') as f:
f.write(fjcontent)
log.info(f'{title}===附件下载成功')
except:
pass
try:
contentWithTag.find('div', class_='qt-attachments').decompose()
att_id, full_path = policy.attuributefile(fj_title, href, num, pub_time)
if att_id:
id_list.append(att_id)
a['href'] = full_path
except:
pass
# try:
# contentWithTag.find('div', class_='qt-attachments').decompose()
# except:
# pass
content = contentWithTag.text.lstrip().strip()
fjtitle_list = fjtitle_list.lstrip().strip()
fjhref_list = fjhref_list.lstrip().strip()
data = [num, title, pub_time, origin, url, pub_time, organ, pub_hao, summary, content, fjtitle_list, fjhref_list]
return data
contentWithTag_str = str(contentWithTag)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = {
'attachmentIds': id_list,
'author': '',
'content': content,
'contentWithTag': contentWithTag_str,
'deleteFlag': 0,
'id': '',
'title': title,
'publishDate': pub_time,
'origin': origin,
'sourceAddress': url,
'writtenDate': pub_time,
'organ': organ,
'topicClassification': '',
'issuedNumber': pub_hao,
'summary': summary,
'createDate': time_now,
'sid': '1729041400674045953',
}
try:
baseCore.sendkafka(dic_info, topic)
baseCore.r.sadd('REITs::' + webname, url)
log.info(f'采集成功--{title}--{url}')
except Exception as e:
for att_id in id_list:
baseCore.deliteATT(att_id)
def doJob():
if not os.path.exists('./相关政策/天津市人民政府/政策文件'):
os.makedirs('./相关政策/天津市人民政府/政策文件')
data_list = []
# if not os.path.exists('./相关政策/天津市人民政府/政策文件'):
# os.makedirs('./相关政策/天津市人民政府/政策文件')
# data_list = []
total = getTotal()
num = 1
for page in range(1, total + 1):
data_json = getJson(page)
for i in range(len(data_json)):
title = data_json[i]['title']
pub_time = datetime.strptime(data_json[i]['trs_time'], "%Y-%m-%dT%H:%M:%S.%f%z").date()
parsed_date = datetime.strptime(data_json[i]['trs_time'], '%Y-%m-%dT%H:%M:%S.%f%z')
# 将解析后的datetime对象格式化为目标字符串格式
pub_time = parsed_date.strftime('%Y-%m-%d %H:%M:%S')
origin = data_json[i]['trs_site']
organ = data_json[i]['department']
href = data_json[i]['url']
# 根据链接判重
is_member = baseCore.r.sismember('REITs::' + webname, href)
if is_member:
continue
pub_hao = data_json[i]['wh']
summary = ''
data = getContent(num, title, pub_time, origin, organ, href, pub_hao, summary)
data_list.append(data)
log.info(f'{title}===采集成功')
getContent(num, title, pub_time, origin, organ, href, pub_hao, summary)
num += 1
df = pd.DataFrame(np.array(data_list))
df.columns = ['序号', '标题', '发布时间', '来源', '原文链接', '发文时间', '发文机构', '发文字号', '摘要', '正文', '附件名称', '附件连接']
df.to_excel('./天津市人民政府政策文件.xlsx', index=False)
# df = pd.DataFrame(np.array(data_list))
# df.columns = ['序号', '标题', '发布时间', '来源', '原文链接', '发文时间', '发文机构', '发文字号', '摘要', '正文', '附件名称', '附件连接']
# df.to_excel('./天津市人民政府政策文件.xlsx', index=False)
if __name__ == '__main__':
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论