提交 5f3288f3 作者: XveLingKun

Merge remote-tracking branch 'origin/master'

import datetime
import time
import urllib.parse
import requests
from ClassTool import ClassTool
from BaseCore import BaseCore
baseTool = ClassTool()
baseCore = BaseCore()
log = baseCore.getLogger()
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Connection': 'keep-alive',
'Host': 'flk.npc.gov.cn',
'Referer': 'https://flk.npc.gov.cn/fl.html',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0',
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
def getDataJson(url):
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
datasJson = req.json()['result']['data']
totalSizes = req.json()['result']['totalSizes']
req.close()
return datasJson, totalSizes
def getPdf(id_, title, publishDate):
id_list = []
url = 'https://flk.npc.gov.cn/api/detail'
payload = {'id': id_}
req = requests.post(url, headers=headers, data=payload)
req.encoding = req.apparent_encoding
datasJson = req.json()['result']['body']
req.close()
href = ''
for dataJson in datasJson:
if dataJson['type'] == 'WORD':
href = 'https://wb.flk.npc.gov.cn' + dataJson['path']
break
if not href:
log.error(f'{title}===附件链接获取失败')
return ''
retData = baseCore.uptoOBS(href, '1699', title)
if retData['state']:
pass
else:
return ''
att_id, full_path = baseCore.tableUpdate(retData, '国务院文件', title, 0, publishDate)
id_list.append(att_id)
return id_list
def getDic(title, office, publishDate, expiry, type, timeliness, href, id_):
id_list = getPdf(id_, title, publishDate)
if not id_list:
log.error(f'{title}===附件下载失败')
return ''
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
dic_news = {
'attachmentIds': id_list, # 附件id
'author': '', # 作者
'content': title, # 正文不带标签
'contentWithTag': '', # 正文带标签
'createDate': now, # 创建时间
'deleteFlag': 0, # 是否删除(0为默认,1为删除)
'id': '', #
'labels': [{'relationId': "1788847783801794562", 'relationName': "国资国企法律法规", 'labelMark': "policy"}],
# 关联标签id 关联标签名称 关联标签标识
'origin': '', # 政策发布机关
'organ': office, # 政策发文机关、制定机关
'topicClassification': '', # 政策文件分类
'issuedNumber': '', # 发文字号
'publishDate': publishDate, # 政策发布时间、法律公布日期
'writtenDate': None, # 成文时间
'implementDate': expiry, # 施行日期
'sid': '1788838266435284993', # 信息源id
'sourceAddress': href, # 原文链接
'summary': '', # 摘要
'title': title, # 标题
'legalPrecedenceHierarchy': type, # 法律效力位阶
'effectiveness': timeliness, # 实效性
}
return dic_news
def doJob():
searchList = ['国有资产', '国资', '国有企业', '企业', '公司']
for search in searchList:
search_ = urllib.parse.quote(search)
url = f'https://flk.npc.gov.cn/api/?type=&fgbt={search_}&searchType=title%3Baccurate%3B1&sortTr=f_bbrq_s%3Bdesc&gbrqStart=&gbrqEnd=&sxrqStart=&sxrqEnd=&page=1&size=10'
datasJson, totalSizes = getDataJson(url)
if totalSizes % 10 == 0:
totalPage = totalSizes / 10
else:
totalPage = totalSizes // 10 + 1
for page in range(1, totalPage + 1):
if page != 1:
url = url.replace(f'&page={page - 1}', f'&page={page}')
datasJson, totalSizes = getDataJson(url)
for dataJson in datasJson:
id_ = dataJson['id']
title = dataJson['title']
office = dataJson['office']
publishDate = dataJson['publish']
expiry = dataJson['expiry']
type = dataJson['type']
status = dataJson['status']
if status == '1':
timeliness = '有效'
elif status == '5':
timeliness = '已修改'
elif status == '9':
timeliness = '已废止'
elif status == '3':
timeliness = '尚未生效'
href = dataJson['url'].replace('./', 'https://flk.npc.gov.cn/')
is_href = baseTool.db_storage.find_one({'网址': href})
if is_href:
log.info(f'{title}===已采集')
continue
dic = getDic(title, office, publishDate, expiry, type, timeliness, href, id_)
if dic:
flag = baseTool.sendKafka(dic)
if flag:
baseTool.save_data(dic)
else:
log.error(f'{title}==={href}===获取失败')
time.sleep(2)
if __name__ == '__main__':
doJob()
import datetime
import json
import os
import re
import time
import urllib.parse
import pandas as pd
import pymongo
import requests
from bs4 import BeautifulSoup
from docx import Document
import io
from retry import retry
from win32com.client import Dispatch
from ClassTool import ClassTool
from BaseCore import BaseCore
baseTool = ClassTool()
baseCore = BaseCore()
log = baseCore.getLogger()
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Connection': 'keep-alive',
'Host': 'flk.npc.gov.cn',
'Referer': 'https://flk.npc.gov.cn/fl.html',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0',
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua': '"Chromium";v="124", "Microsoft Edge";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
searchHeaders = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0',
'X-Requested-With': 'XMLHttpRequest',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Microsoft Edge";v="126"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN[
'国务院_国资委_copy1']
@retry(tries=2, delay=10)
def searchDate(key):
url = 'https://www.pkulaw.com/law/chl'
dataPost = f'Menu=law&Keywords={urllib.parse.quote(key)}&PreKeywords={urllib.parse.quote(key)}&SearchKeywordType=Title&MatchType=Exact&RangeType=Piece&Library=chl&ClassFlag=chl&GroupLibraries=&QuerySearchCondition=Title%2BExact%2BPiece%2B0&QueryOnClick=False&AfterSearch=True&RequestFrom=btnSearch&SearchInResult=&PreviousLib=chl&IsSynonymSearch=false&RecordShowType=List&ClassCodeKey=&IsSearchErrorKeyword=&FirstQueryKeywords={urllib.parse.quote(key)}&FirstQueryKeywordType=Title&IsSynonymSearch=false&X-Requested-With=XMLHttpRequest'
req = requests.post(url, data=dataPost, headers=searchHeaders)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml')
divList = soup.find_all('div', class_='accompanying-wrap')
for divTag in divList:
itemList = divTag.select('> div.item')
if len(divList) == 1 and len(itemList) == 1:
itemTag = itemList[0]
href = 'https://www.pkulaw.com' + itemTag.find('div', class_='col').find('div', class_='t').find('h4').find('a').get('href')
req_ = requests.get(href, headers=searchHeaders)
req_.encoding = req_.apparent_encoding
soup_ = BeautifulSoup(req_.text, 'html.parser')
liList = soup_.find('div', class_='fields').find('ul').find_all('li')
publishDate = ''
expiry = ''
for liTag in liList:
if '公布日期' in liTag.text:
publishDate = liTag.text.split('公布日期:')[1].strip()
publishDate = datetime.datetime.strptime(publishDate, '%Y.%m.%d').strftime('%Y-%m-%d %H:%M:%S')
if '施行日期' in liTag.text:
expiry = liTag.text.split('施行日期:')[1].strip()
expiry = datetime.datetime.strptime(expiry, '%Y.%m.%d').strftime('%Y-%m-%d %H:%M:%S')
return publishDate, expiry
else:
for itemTag in itemList:
title = itemTag.find('div', class_='col').find('div', class_='t').find('h4').find('a').text
href = 'https://www.pkulaw.com' + itemTag.find('div', class_='col').find('div', class_='t').find('h4').find('a').get('href')
if title == key:
req_ = requests.get(href, headers=searchHeaders)
req_.encoding = req_.apparent_encoding
soup_ = BeautifulSoup(req_.text, 'html.parser')
liList = soup_.find('div', class_='fields').find('ul').find_all('li')
publishDate = ''
expiry = ''
for liTag in liList:
if '公布日期' in liTag.text:
publishDate = liTag.text.split('公布日期:')[1].strip()
publishDate = datetime.datetime.strptime(publishDate, '%Y.%m.%d').strftime('%Y-%m-%d %H:%M:%S')
if '施行日期' in liTag.text:
expiry = liTag.text.split('施行日期:')[1].strip()
expiry = datetime.datetime.strptime(expiry, '%Y.%m.%d').strftime('%Y-%m-%d %H:%M:%S')
return publishDate, expiry
else:
return '', ''
@retry(tries=2, delay=10)
def getDataJson(url):
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
datasJson = req.json()['result']['data']
totalSizes = req.json()['result']['totalSizes']
req.close()
return datasJson, totalSizes
def getPdf(id_, title, publishDate):
id_list = []
url = 'https://flk.npc.gov.cn/api/detail'
payload = {'id': id_}
req = requests.post(url, headers=headers, data=payload)
req.encoding = req.apparent_encoding
datasJson = req.json()['result']['body']
req.close()
href = ''
for dataJson in datasJson:
if dataJson['type'] == 'WORD':
href = 'https://wb.flk.npc.gov.cn' + dataJson['path']
break
if not href:
log.error(f'{title}===附件链接获取失败')
return ''
retData = baseCore.uptoOBS(href, '1699', title)
if retData['state']:
pass
else:
return ''
att_id, full_path = baseCore.tableUpdate(retData, '国务院文件', title, 0, publishDate)
id_list.append(att_id)
return id_list
def is_member_containing_string(string):
cursor = '0'
while True:
# 使用 SCAN 命令遍历 Set 列表
cursor, members = baseCore.r.sscan(f'flk_ok', cursor)
for member in members:
# 判断字符串是否包含指定字符串
if string in member.decode("utf-8"):
return True
if cursor == b'0' or cursor == 0:
break
return False
def is_member_containing_string_bucai(string):
cursor = '0'
while True:
# 使用 SCAN 命令遍历 Set 列表
cursor, members = baseCore.r.sscan(f'flk_bucai', cursor)
for member in members:
# 判断字符串是否包含指定字符串
if string in member.decode("utf-8"):
return True
if cursor == b'0' or cursor == 0:
break
return False
def selectMongo(url):
data = db_storage.find_one({'来源': '国资国企法律法规', '网址': url})
id_list = data['附件id']
return id_list
@retry(tries=2, delay=10)
def getReqContent(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0'
}
req = requests.get(url, headers=headers)
content = req.content
req.close()
return content
def readDocx(byteStream, flg):
contentWithTag = BeautifulSoup('', 'html.parser')
if flg:
byteStreamIo = io.BytesIO(byteStream)
else:
byteStreamIo = byteStream
doc = Document(byteStreamIo)
content = ''
for para in doc.paragraphs:
content += f'{para.text}\n'
newTag = contentWithTag.new_tag('p')
newTag.string = para.text
contentWithTag.append(newTag)
return content, str(contentWithTag)
def convert_doc_to_docx_stream(url):
doc_path = r'./tmp/tmp.doc'
reqContent = getReqContent(url)
with open(doc_path, 'wb') as f:
f.write(reqContent)
# 启动Word应用程序
word = Dispatch("kwps.Application")
word.Visible = False # 后台运行,不显示Word界面
# 打开.doc文件
doc = word.Documents.Open(doc_path)
# 创建一个内存中的字节流对象
# docx_stream = io.BytesIO()
# 将.doc文件另存为.docx格式到字节流
doc.SaveAs(r'./tmp/tmp.docx', 16)
# 关闭文档
doc.Close()
# 退出Word应用程序
word.Quit()
log.info('doc文件已转换为docx文件')
def clearTmp():
folder_path = r'./tmp'
# 遍历文件夹中的所有文件和子文件夹
for filename in os.listdir(folder_path):
# 构建完整的文件路径
file_path = os.path.join(folder_path, filename)
# 删除文件或子文件夹
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
os.rmdir(file_path)
log.info('临时文件已删除')
def getDic(title, office, publishDate, expiry, type, timeliness, href, id_,dateDic):
# id_list = getPdf(id_, title, publishDate)
# if not id_list:
# log.error(f'{title}===附件下载失败')
# return ''
# 如果发布日期或实施日期中的任意一个为空,尽心搜索操作
# if not publishDate or not expiry:
# searchExpiry, searchPublishDate = searchDate(title)
# # 如果没有搜索到实施日期并且第一次也没有采集到实施日期,采集失败
# if not searchExpiry and not expiry:
# log.error(f'{title}===搜索后依旧没有实施日期')
# return {}
# # 如果没有搜索到发布日期并且第一次也没有采集到发布日期,采集失败
# if not searchPublishDate and not publishDate:
# log.error(f'{title}===搜索后依旧没有发布日期')
# return {}
# # 如果发布日期存在
# if publishDate:
# # 判断搜索到的发布日期与采集到的发布日期是否一致,不一致则采集失败
# if publishDate != searchPublishDate:
# log.error(f'{title}===搜索到发布时间与采集到发布时间不一致')
# return {}
# # 如果发布日期不存在,则赋值为搜索到的发布日期
# else:
# publishDate = searchPublishDate
# # 如果实施日期存在
# if expiry:
# # 判断搜索到的实施日期与采集到的实施日期是否一致,不一致则采集失败
# if expiry != searchExpiry:
# log.error(f'{title}===搜索到实施日期与采集到实施日期不一致')
# return {}
# # 如果实施日期不存在,则赋值为搜索到的实施日期
# else:
# # 判断搜索到的实施日期与采集到的实施日期是否一致,不一致则采集失败
# expiry = searchExpiry
publishDate = dateDic[title]['publishDate']
expiry = dateDic[title]['expiry']
try:
id_list = selectMongo(href)
except:
log.info(f'之前没有采集')
return {}
attachmentId = id_list[0]
sql = f'select full_path,category from clb_sys_attachment where id="{attachmentId}"'
baseCore.cursor_.execute(sql)
info = baseCore.cursor_.fetchone()
fullPath = info[0]
category = info[1].strip('.')
log.info(f'{title}===开始获取正文===文件类型为{category}')
try:
if category == 'doc':
convert_doc_to_docx_stream(fullPath)
content, contentWithTag = readDocx(r'./tmp/tmp.docx', False)
clearTmp()
else:
byteStream = getReqContent(fullPath)
content, contentWithTag = readDocx(byteStream, True)
except Exception as e:
log.error(f'{title}===文件解析失败==={e}')
return {}
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
dic_news = {
'attachmentIds': id_list, # 附件id
'author': '', # 作者
'content': content, # 正文不带标签
'contentWithTag': contentWithTag, # 正文带标签
'createDate': now, # 创建时间
'deleteFlag': 0, # 是否删除(0为默认,1为删除)
'id': '', #
'labels': [{'relationId': "1788847783801794562", 'relationName': "国资国企法律法规", 'labelMark': "policy"}],
# 关联标签id 关联标签名称 关联标签标识
'origin': '', # 政策发布机关
'organ': office, # 政策发文机关、制定机关
'topicClassification': '', # 政策文件分类
'issuedNumber': '', # 发文字号
'publishDate': publishDate, # 政策发布时间、法律公布日期
'writtenDate': None, # 成文时间
'implementDate': expiry, # 施行日期
'sid': '1788838266435284993', # 信息源id
'sourceAddress': href, # 原文链接
'summary': '', # 摘要
'title': title, # 标题
'legalPrecedenceHierarchy': type, # 法律效力位阶
'effectiveness': timeliness, # 实效性
}
return dic_news
def doJob():
dateDic = getDate()
searchList = ['国有资产', '国资', '国有企业', '企业', '公司']
for search in searchList:
search_ = urllib.parse.quote(search)
url = f'https://flk.npc.gov.cn/api/?type=&fgbt={search_}&searchType=title%3Baccurate%3B1&sortTr=f_bbrq_s%3Bdesc&gbrqStart=&gbrqEnd=&sxrqStart=&sxrqEnd=&page=1&size=10'
datasJson, totalSizes = getDataJson(url)
if totalSizes % 10 == 0:
totalPage = totalSizes / 10
else:
totalPage = totalSizes // 10 + 1
for page in range(1, int(totalPage) + 1):
if page != 1:
url = url.replace(f'&page={page - 1}', f'&page={page}')
datasJson, totalSizes = getDataJson(url)
for dataJson in datasJson:
id_ = dataJson['id']
title = dataJson['title']
office = dataJson['office']
publishDate = dataJson['publish']
expiry = dataJson['expiry']
type = dataJson['type']
status = dataJson['status']
if status == '1':
timeliness = '有效'
elif status == '5':
timeliness = '已修改'
elif status == '9':
timeliness = '已废止'
elif status == '3':
timeliness = '尚未生效'
href = dataJson['url'].replace('./', 'https://flk.npc.gov.cn/')
if is_member_containing_string_bucai(href):
if is_member_containing_string(href):
log.info(f'{title}===已补采')
continue
log.info(f'开始补采==={title}')
dic = getDic(title, office, publishDate, expiry, type, timeliness, href, id_,dateDic)
if dic:
flag = baseTool.sendKafka(dic)
if flag:
baseCore.r.sadd('flk_ok', href)
else:
log.error(f'{title}==={href}===获取失败')
time.sleep(2)
def getDate():
dic = {}
df = pd.read_excel('./副本YJZX_国资国企法律法规-缺少时间补充v3.xlsx', sheet_name='Sheet1')
titles = df['标题'].to_list()
publishDates = df['发布时间'].to_list()
expiries = df['实施时间'].to_list()
for i in range(len(titles)):
title = titles[i]
publishDate = publishDates[i]
publishDate = datetime.datetime.strptime(publishDate, '%Y.%m.%d').strftime('%Y-%m-%d %H:%M:%S')
expiry = expiries[i]
expiry = datetime.datetime.strptime(expiry, '%Y.%m.%d').strftime('%Y-%m-%d %H:%M:%S')
dic[title] = {
'publishDate': publishDate,
'expiry': expiry
}
return dic
if __name__ == '__main__':
doJob()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论