提交 88647541 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

import json
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from base import BaseCore
from apscheduler.schedulers.blocking import BlockingScheduler
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
def sendKafka(dic_news):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("leadership",
json.dumps(dic_news, ensure_ascii=False).encode('utf-8'))
log.info(kafka_result.get(timeout=10))
log.info('传输成功')
except:
log.error('传输失败')
def getSoup(url):
req = requests.get(url, headers=headers)
req.encoding = 'gbk'
req_text = req.content.decode('gbk').encode('utf-8')
soup = BeautifulSoup(req_text, 'html.parser')
return soup
def gwybw():
datas = []
url = 'http://district.ce.cn/zt/rwk/sf/bj/qx/201206/07/t20120607_1269157.shtml'
soup = getSoup(url)
p_gwybw_list = soup.select('#right-part-gwybw')[0].select('div > div > div > p')
for p in p_gwybw_list:
is_blog = p.select('strong > font')
if len(is_blog) != 0:
belog = is_blog[0].text
continue
a_list = p.find_all('a')
for a in a_list:
name = a.find('font').text
href = a.get('href')
datas.append([name, belog, href])
for data in datas:
peoples = []
belog = data[0]
href = data[2]
if href == '':
continue
soup = getSoup(href)
try:
content = soup.select('body > div.main > div.content > div.con_left > div.block')[0]
ul_list = content.select('ul')
div_list = content.select('div.title2')
for i in range(len(div_list)):
posts = div_list[i].text
if posts == '人事动态':
continue
li_list = ul_list[i].select('li')
for li in li_list:
name = li.text.split('(')[0]
try:
img_src = li.select('a > img')[0].get('src')
except:
img_src = ''
people = {
'name': name, # 姓名
'sex': '', # 性别
'work': posts, # 职务
'birthplace': '', # 出生地
'birthday': '', # 出生日期
'company': '', # 曾任单位
'city': '', # 关联城市
'school': '', # 毕业院校
'province': '', # 省或者直辖市
'type': 1, # 直属类别(1:部委人物库 2:中直任务库 3:地方人物库)
'department': belog, # 部门
'headSculpture': img_src, # 照片链接
}
peoples.append(people)
except:
pass
# 数据传输格式:{
# //部门
# private String department;
# //部门人
# private List<GovernmentLeaders> persons;
# }
data = {
'department': belog,
'persons': peoples
}
# sendKafka(data)
print(data)
def zyzsjg():
datas = []
url = 'http://district.ce.cn/zt/rwk/sf/bj/qx/201206/07/t20120607_1269157.shtml'
soup = getSoup(url)
p_zyzsjg_list = soup.select('#right-part-zyzsjg')[0].select('div > div > div > p')
for p in p_zyzsjg_list:
is_blog = p.select('strong > font')
if len(is_blog) != 0:
belog = is_blog[0].text
continue
try:
a = p.find('a')
name = a.find('font').text
href = a.get('href')
except:
name = p.find('font').text.replace('\xa0', '')
href = ''
datas.append([name, belog, href])
for data in datas:
peoples = []
belog = data[0]
href = data[2]
if href == '':
continue
soup = getSoup(href)
try:
content = soup.select('body > div.main > div.content > div.con_left > div.block')[0]
ul_list = content.select('ul')
div_list = content.select('div.title2')
for i in range(len(div_list)):
posts = div_list[i].text
if posts == '人事动态':
continue
li_list = ul_list[i].select('li')
for li in li_list:
name = li.text.split('(')[0]
try:
img_src = li.select('a > img')[0].get('src')
except:
img_src = ''
people = {
'name': name, # 姓名
'sex': '', # 性别
'work': posts, # 职务
'birthplace': '', # 出生地
'birthday': '', # 出生日期
'company': '', # 曾任单位
'city': '', # 关联城市
'school': '', # 毕业院校
'province': '', # 省或直辖市
'type': 2, # 直属类别(1:部委人物库 2:中直任务库 3:地方人物库)
'department': belog, # 部门
'headSculpture': img_src, # 照片链接
}
# print(name)
peoples.append(people)
except:
pass
# 数据传输格式:{
# //部门
# private String department;
# //部门人
# private List<GovernmentLeaders> persons;
# }
data = {
'department': belog,
'persons': peoples
}
# sendKafka(data)
print(data)
def gwybw_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个月执行一次
scheduler.add_job(gwybw, 'cron', day='1', hour=0, minute=0)
try:
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
def zyzsjg_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个月执行一次
scheduler.add_job(zyzsjg, 'cron', day='1', hour=0, minute=0)
try:
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
if __name__ == "__main__":
try:
gwybw_task()
except:
log.error('部委人物采集出错')
try:
zyzsjg_task()
except:
log.error('中直人物采集出错')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论