提交 f7c06cc2 作者: 薛凌堃
# 政策法规 删除 重复数据 重复量为2条 其中一条是1666
import cx_Oracle
# 连接数据库
conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
# 执行查询
cursor = conn.cursor()
cursor.execute('SELECT sourceaddress FROM ( \
select t.sourceaddress,count(1) cnn from CIS_LAW_DATA t where t.typeid in (select id from cis_law_type start with id=1622 connect by prior id=PID) \
group by t.sourceaddress \
order by cnn desc \
) T WHERE SOURCEADDRESS IS NOT NULL AND CNN>1')
result = cursor.fetchall()
i=0
for row in result:
i=i+1
url=row[0]
cursor.execute(f"select typeid,id from CIS_LAW_DATA where sourceaddress='{url}'")
lawInfoList = cursor.fetchall()
count = len(lawInfoList)
if count==2:
pass
else:
continue
id1=lawInfoList[0][1]
typeid1 = lawInfoList[0][0]
id2 = lawInfoList[1][1]
typeid2 = lawInfoList[1][0]
if typeid1==typeid2:
cursor.execute(f"delete from CIS_LAW_DATA where id='{id1}'")
conn.commit()
print(f"{i}---typeid 重复 删除第一个-----{id1}--{typeid1}--{id2}--{typeid2}")
continue
else:
if(typeid1==1666):
cursor.execute(f"delete from CIS_LAW_DATA where id='{id1}'")
conn.commit()
print(f"{i}---typeid1为1666 重复 删除第一个-----{id1}--{typeid1}--{id2}--{typeid2}")
continue
if typeid2==1666:
cursor.execute(f"delete from CIS_LAW_DATA where id='{id2}'")
conn.commit()
print(f"{i}---typeid2为1666 重复 删除第二个-----{id1}--{typeid1}--{id2}--{typeid2}")
continue
print(f"{i}----不能够删除----{id1}--{typeid1}--{id2}--{typeid2}")
# 关闭数据库连接
cursor.close()
conn.close()
# coding: utf-8
# In[1]:
from selenium import webdriver
from bs4 import BeautifulSoup
import pandas as pd
import requests
import re
import time
import json
import demjson
# In[ ]:
list_every_year_info = []
for year in range(2012,2023):
print(year)
url_all = "https://www.caifuzhongwen.com/fortune500/paiming/global500/{}_%e4%b8%96%e7%95%8c500%e5%bc%ba.htm".format(year)
list_all_info_url = []
list_top = []
list_name_ch = []
list_name_en = []
list_shouru = []
list_lirun = []
list_country = []
list_hangye = []
list_zongbu = []
list_ceo = []
list_renshu = []
list_shouru_add = []
list_lirun_add = []
list_zichan = []
list_quanyi = []
list_jinglilv = []
list_shouyilv = []
list_url = []
response_all = requests.get(url_all)
soup_all = BeautifulSoup(response_all.content, 'html.parser')
list_all_com = soup_all.find('tbody',{'style':'word-break:break-all'}).find_all('tr')
top = 1
for com in list_all_com[1:]: #获取企业名称、拼接企业URL
list_com_info = com.find_all('td')
name_ch = re.findall(">(.*?)<",str(list_com_info[1]))[1]
name_en = re.findall(">(.*?)<",str(list_com_info[1]))[2]
url_com = "https://www.caifuzhongwen.com/fortune500/" + list_com_info[1].find('a').get('href')[5:]
list_top.append(top)
list_name_ch.append(name_ch)
list_name_en.append(name_en)
list_all_info_url.append(url_com)
top = top+1
try:
try:
soup_text = soup_all.find_all('script',{'src':''})[1].text.replace("\n","").replace("\t",'').replace(" ",'')
soup_text_2 = re.findall("varcompanyDetails=(.*?)vartable",soup_text)[0].replace("\\","").replace("item1","\'item1\'").replace("item2","\'item2\'").replace("item3","\'item3\'")
except:
soup_text = soup_all.find_all('script',{'src':''})[2].text.replace("\n","").replace("\t",'').replace(" ",'')
soup_text_2 = re.findall("varcompanyDetails=(.*?)vartable",soup_text)[0].replace("\\","").replace("item1","\'item1\'").replace("item2","\'item2\'").replace("item3","\'item3\'")
except:
soup_text = soup_all.find_all('script',{'src':''})[3].text.replace("\n","").replace("\t",'').replace(" ",'')
soup_text_2 = re.findall("varcompanyDetails=(.*?)vartable",soup_text)[0].replace("\\","").replace("item1","\'item1\'").replace("item2","\'item2\'").replace("item3","\'item3\'")
dic_list = re.findall('{(.*?)}',soup_text_2[1:])
list_all_com_money_info = []
list_one_com_money_info = []
num = 0
num_2 = 0
for i in range(0,len(dic_list)):
num = num+1
if num == 7 or num == 8:
one_con_money_info1 = "{" + dic_list[i] + "}"
else:
one_con_money_info1 = "{" + dic_list[i][:-1] + "}"
one_con_money_info2 = one_con_money_info1.replace("\'","\"")
dic_one_con_money_info2 = json.loads(str(one_con_money_info2))
list_one_com_money_info.append(dic_one_con_money_info2)
if num == 8:
list_all_com_money_info.append(list_one_com_money_info)
list_one_com_money_info = []
num = 0
for one_con_money_info in list_all_com_money_info: #获取各个企业的收入等数据,list_all_com_money_info由后面字段取出后做成字典
shouru = one_con_money_info[1]['item2']
shouru_add = one_con_money_info[1]['item3']
lirun = one_con_money_info[2]['item2']
lirun_add = one_con_money_info[2]['item3']
zichan = one_con_money_info[3]['item2']
quanyi = one_con_money_info[4]['item2']
jinglilv = one_con_money_info[6]['item2']
shouyilv = one_con_money_info[7]['item2']
list_shouru.append(shouru)
list_shouru_add.append(shouru_add)
list_lirun.append(lirun)
list_lirun_add.append(lirun_add)
list_zichan.append(zichan)
list_quanyi.append(quanyi)
list_jinglilv.append(jinglilv)
list_shouyilv.append(shouyilv)
for com_url in list_all_info_url: #进入每个企业网页,获取每个企业信息,此处最好加time.sleep
response_one_com_info = requests.get(com_url)
soup_one_url_info = BeautifulSoup(response_one_com_info.content, 'html.parser')
list_one_com_info = soup_one_url_info.find('table').find_all('tr')
ceo = list_one_com_info[0].find_all('td')[1].text
country = list_one_com_info[1].find_all('td')[1].text
hangye = list_one_com_info[2].find_all('td')[1].text
zongbu = list_one_com_info[3].find_all('td')[1].text
renshu = list_one_com_info[4].find_all('td')[1].text
url = list_one_com_info[5].find_all('td')[1].text
list_ceo.append(ceo)
list_country.append(country)
list_hangye.append(hangye)
list_zongbu.append(zongbu)
list_renshu.append(renshu)
list_url.append(url)
print(com_url+":爬取完成")
time.sleep(2)
dic_all_com_info = {
'排名':list_top,
'中文名称':list_name_ch,
'英文名称':list_name_en,
'营业收入(百万美元)':list_shouru,
'利润(百万美元)':list_lirun,
'企业所属国家':list_country,
'行业':list_hangye,
'企业总部地址':list_zongbu,
'企业首席执行官(CEO)':list_ceo,
'企业员工数':list_renshu,
'企业官网':list_url,
'营业收入:百万美元':list_shouru,
'营业收入:年增减%':list_shouru_add,
'利润:百万美元':list_lirun,
'利润:年增减%':list_lirun_add,
'资产:百万美元':list_zichan,
'资产:年增减%':'--',
'股东权益:百万美元':list_quanyi,
'股东权益:年增减%':'--',
'利润占比:%':'None',
'利润占比:年增减%':'None',
'净利率:%':list_jinglilv,
'净利率:年增减%':'None',
'资产收益率:%':list_shouyilv,
'资产收益率:年增减%':'None'
}
list_all_year.append(dic_all_com_info)
++ "b/\347\231\276\345\272\246\351\207\207\351\233\206/.gitkeep"
"""
"""
任务集成测试
1、连接redis做取出
2、连接kafka做信息的获取,与存储
"""
import time
import redis
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
import itertools
from baiduSpider import BaiduSpider
import concurrent.futures
from baseCore import BaseCore
from queue import Queue
import configparser
class BaiduTaskJob(object):
def __init__(self):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getkafka(self):
# Kafka集群的地址
bootstrap_servers = self.config.get('kafka', 'bootstrap_servers')
# 要订阅的主题
topic = self.config.get('kafka', 'topic')
groupId=self.config.get('kafka', 'groupId')
consumer = KafkaConsumer(topic, group_id=groupId,
bootstrap_servers=[bootstrap_servers],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
try:
for record in consumer:
try:
logger.info("value:",record.value)
keymsg=record.value
if keymsg:
break
else:
continue
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except Exception as e:
logger.info("msg.value error:",e)
except KeyboardInterrupt as e:
keymsg={}
finally:
consumer.close()
return keymsg
def getkeyFromredis(self,codeid):
kvalue=self.r.get('KEY_WORDS_TO_REDIS::'+codeid)
kvalue=kvalue.decode('utf-8')
kvalue=json.loads(kvalue)
return kvalue
def getkeywords(self,keywords):
kwList=[]
if ')+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
elif len(kk2)==3:
result = list(itertools.product(kk2[0], kk2[1],kk2[2]))
elif len(kk2)==4:
result = list(itertools.product(kk2[0], kk2[1],kk2[2],kk2[3]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
elif '+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
else:
k3=keywords.split("|")
kwList=k3
return kwList
def paserKeyMsg(self,keymsg):
logger.info('----------')
wordsCode=keymsg['wordsCode']
id=keymsg['id']
try:
searchEngines=keymsg['searchEngines']
except Exception as e:
searchEngines=[]
kwList=[]
if searchEngines:
if '3' in searchEngines:
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
else:
logger.info('+++++')
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
return kwList
def runSpider(self,kwmsg):
try:
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
baiduSpider=BaiduSpider(searchkw,wordsCode,sid)
baiduSpider.get_page_html()
baiduSpider.get_detail_html()
except Exception as e:
logger.info('百度搜索异常'+searchkw)
finally:
baiduSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw)
if __name__ == '__main__':
# ss='道地西洋参+(销售市场|交易市场|直播带货|借助大会平台|网店|微商|电商|农民博主|推介宣传|高品质定位|西洋参产品经营者加盟|引进龙头企业|西洋参冷风库|建设农旅中心|农产品展销中心|精品民宿|温泉)'
# keymsglist=getkeywords(ss)
# print(keymsglist)
# 创建Redis连接
baiduTaskJob=BaiduTaskJob()
baseCore=BaseCore()
logger=baseCore.getLogger()
print('---------------')
while True:
try:
try:
keymsg=baiduTaskJob.getkafka()
kwList=baiduTaskJob.paserKeyMsg(keymsg)
except Exception as e:
logger.info("从kafka拿取信息失败!")
time.sleep(5)
continue
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(baiduTaskJob.runSpider, data) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
result = future.result()
# 处理任务的执行结果
logger.info(f"任务执行结束: {result}")
except Exception as e:
# 处理任务执行过程中的异常
logger.info(f"任务执行exception: {e}")
except Exception as e:
logger.info('采集异常')
[redis]
[redis]
host=114.115.236.206
port=6379
pass=clbzzsn
[mysql]
host=114.115.159.144
username=root
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
[kafka]
bootstrap_servers=114.115.159.144:9092
topic=keyWordsInfo
groupId=python_baidu_test
[selenium]
chrome_driver=C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe
binary_location=D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论