提交 62fa9e22 作者: XveLingKun

20241104

上级 e943c06c
这是一段示例文本。
\ No newline at end of file
这是另一段示例文本。
\ No newline at end of file
......@@ -1387,7 +1387,17 @@ def getkeywords(keywords):
kwList=k3
return kwList
def test31():
aaa = []
def test():
return 1, 2
c = test()
aaa.append((3, c))
print(aaa)
for i, j in aaa:
print(i)
print(j)
pass
if __name__ == "__main__":
# # import queue
......@@ -1512,33 +1522,34 @@ if __name__ == "__main__":
# print(aaa)
# aaa = int("07")
# print(aaa)
title = "党建论文│工控科产党委“1+2+V”大党建工作格局推动党建工作与生产经营深度融合"
content = "党建工作和深度融合"
keywords = "(浙江|北京)+(尼日利亚|科特迪瓦)+(活动|访问)"
keywords_split = getkeywords(keywords)
print(keywords_split)
tf_title = 0 # 统计当前规则中的关键词在标题中出现的次数
tf_content = 0 # 统计当前规则中的关键词在内容中出现的次数
for kw in keywords_split:
if "+" in kw:
# todo:2024-10-15 关键词需要同时出现 若没有同时出现则分数为0
kws = kw.split("+")
for k in kws:
c_t = str(title).lower().count(k)
c_c = str(content).lower().count(k)
if c_c:
# 如果文章中出现
tf_content += c_c
else:
tf_content = 0
break
if c_t:
tf_title += c_t
else:
tf_title = 0
break
else:
tf_title += str(title).lower().count(kw)
tf_content += str(content).lower().count(kw)
print(tf_title)
print(tf_content)
\ No newline at end of file
# title = "党建论文│工控科产党委“1+2+V”大党建工作格局推动党建工作与生产经营深度融合"
# content = "党建工作和深度融合"
# keywords = "(浙江|北京)+(尼日利亚|科特迪瓦)+(活动|访问)"
# keywords_split = getkeywords(keywords)
# print(keywords_split)
# tf_title = 0 # 统计当前规则中的关键词在标题中出现的次数
# tf_content = 0 # 统计当前规则中的关键词在内容中出现的次数
# for kw in keywords_split:
# if "+" in kw:
# # todo:2024-10-15 关键词需要同时出现 若没有同时出现则分数为0
# kws = kw.split("+")
# for k in kws:
# c_t = str(title).lower().count(k)
# c_c = str(content).lower().count(k)
# if c_c:
# # 如果文章中出现
# tf_content += c_c
# else:
# tf_content = 0
# break
# if c_t:
# tf_title += c_t
# else:
# tf_title = 0
# break
# else:
# tf_title += str(title).lower().count(kw)
# tf_content += str(content).lower().count(kw)
# print(tf_title)
# print(tf_content)
test31()
\ No newline at end of file
import requests, json, time, pymysql, sys
import requests, json, time, pymysql, sys
......@@ -3,7 +3,7 @@ import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
list_com_code = []
......@@ -38,12 +38,12 @@ list_all_info = []
for com_code in list_com_code[0:]:
print(com_code)
url_xueqiu = f'https://stock.xueqiu.com/v5/stock/f10/cn/company.json?symbol={com_code}'
json_xueqiu = requests.get(url_xueqiu, headers=headers).json()
dic_com = json_xueqiu['data']['company']
com_type = dic_com['classi_name']
# url_xueqiu = f'https://stock.xueqiu.com/v5/stock/f10/cn/company.json?symbol={com_code}'
#
# json_xueqiu = requests.get(url_xueqiu, headers=headers).json()
# dic_com = json_xueqiu['data']['company']
#
# com_type = dic_com['classi_name']
# com_money = dic_com['issue_price']*dic_com['actual_issue_vol']
url_dongfang = f'https://emweb.eastmoney.com/PC_HSF10/CompanySurvey/PageAjax?code={com_code}'
......@@ -123,8 +123,8 @@ for com_code in list_com_code[0:]:
list_all_info_tuple = []
for list_info in list_all_info:
list_all_info_tuple.append(tuple(list_info))
with cnx.cursor() as cursor:
Upsql = ''' update sys_base_enterprise_ipo set enterprise_type = %s,total_market_value = %s,before_total_market_value = %s,operating_revenue = %s,operating_revenue_rate = %s,profit = %s,profit_rate = %s,assets = %s,return_on_assets = %s,shareholders_equity = %s where securities_code = %s '''
cursor.executemany(Upsql, list_all_info_tuple)
cnx.commit()
#
# with cnx.cursor() as cursor:
# Upsql = ''' update sys_base_enterprise_ipo set enterprise_type = %s,total_market_value = %s,before_total_market_value = %s,operating_revenue = %s,operating_revenue_rate = %s,profit = %s,profit_rate = %s,assets = %s,return_on_assets = %s,shareholders_equity = %s where securities_code = %s '''
# cursor.executemany(Upsql, list_all_info_tuple)
# cnx.commit()
from bs4 import BeautifulSoup
import requests,time,re
import sys
sys.path.append('D:\\zzsn_spider\\base')
import BaseCore
baseCore = BaseCore.BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
log = baseCore.getLogger()
taskType = '500强专利'
# headers = {
# "Cookie":"currentUrl=https%3A%2F%2Fworldwide.espacenet.com%2Fdata%2FsearchResults%3Fsubmitted%3Dtrue%26locale%3Dcn_EP%26DB%3DEPODOC%26ST%3Dadvanced%26TI%3D%26AB%3D%26PN%3D%26AP%3D%26PR%3D%26PD%3D2022%25202021%25202020%25202019%26PA%3Dapple%26IN%3D%26CPC%3D%26IC%3D%26rnd%3D1663641959596; PGS=10; _pk_id.93.72ee=ee83303e45a089a1.1663061058.; org.springframework.web.servlet.i18n.CookieLocaleResolver.LOCALE=cn_EP; _pk_ses.93.72ee=1; LevelXLastSelectedDataSource=EPODOC; menuCurrentSearch=%2F%2Fworldwide.espacenet.com%2FsearchResults%3FAB%3D%26AP%3D%26CPC%3D%26DB%3DEPODOC%26IC%3D%26IN%3D%26PA%3Dapple%26PD%3D2022%26PN%3D%26PR%3D%26ST%3Dadvanced%26Submit%3D%E6%A3%80%E7%B4%A2%26TI%3D%26locale%3Dcn_EP; currentUrl=https%3A%2F%2Fworldwide.espacenet.com%2FsearchResults%3Fsubmitted%3Dtrue%26locale%3Dcn_EP%26DB%3DEPODOC%26ST%3Dadvanced%26TI%3D%26AB%3D%26PN%3D%26AP%3D%26PR%3D%26PD%3D2022%26PA%3Dapple%26IN%3D%26CPC%3D%26IC%3D%26Submit%3D%25E6%25A3%2580%25E7%25B4%25A2; JSESSIONID=qnLt7d5QgBtpUGjMmHuD-kwJ.espacenet_levelx_prod_2",
# "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
# }
# df_all = pd.read_excel('D:\\kkwork\\jupyter\\专利数量\\t1.xlsx')
# for i in range(2022,1890,-1):
# df_all[f'{i}'] = ''
# df_all['Espacenet专利检索'] = ''
headers = {
"Cookie": "currentUrl=https%3A%2F%2Fworldwide.espacenet.com%2Fdata%2FsearchResults%3Fsubmitted%3Dtrue%26locale%3Dcn_EP%26DB%3DEPODOC%26ST%3Dadvanced%26TI%3D%26AB%3D%26PN%3D%26AP%3D%26PR%3D%26PD%3D2022%25202021%25202020%25202019%26PA%3Dapple%26IN%3D%26CPC%3D%26IC%3D%26rnd%3D1663641959596; PGS=10; _pk_id.93.72ee=ee83303e45a089a1.1663061058.; org.springframework.web.servlet.i18n.CookieLocaleResolver.LOCALE=cn_EP; _pk_ses.93.72ee=1; LevelXLastSelectedDataSource=EPODOC; menuCurrentSearch=%2F%2Fworldwide.espacenet.com%2FsearchResults%3FAB%3D%26AP%3D%26CPC%3D%26DB%3DEPODOC%26IC%3D%26IN%3D%26PA%3Dapple%26PD%3D2022%26PN%3D%26PR%3D%26ST%3Dadvanced%26Submit%3D%E6%A3%80%E7%B4%A2%26TI%3D%26locale%3Dcn_EP; currentUrl=https%3A%2F%2Fworldwide.espacenet.com%2FsearchResults%3Fsubmitted%3Dtrue%26locale%3Dcn_EP%26DB%3DEPODOC%26ST%3Dadvanced%26TI%3D%26AB%3D%26PN%3D%26AP%3D%26PR%3D%26PD%3D2022%26PA%3Dapple%26IN%3D%26CPC%3D%26IC%3D%26Submit%3D%25E6%25A3%2580%25E7%25B4%25A2; JSESSIONID=qnLt7d5QgBtpUGjMmHuD-kwJ.espacenet_levelx_prod_2",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
}
def name_handle(english_name_):
if 'INC.' in english_name_ or 'LTD.' in english_name_ or 'CO.' in english_name_ \
or 'CORP.' in english_name_ or 'GMBH' in english_name_ \
or ' AG' in english_name_ or 'SARL' in english_name_ or 'S.A.' in english_name_ \
or 'PTY' in english_name_ or 'LLC' in english_name_ or 'LLP' in english_name_ \
or ' AB' in english_name_ or ' NV' in english_name_ or 'N.V.' in english_name_ \
or 'A.S.' in english_name_ or ' SA' in english_name_ or ',Limited' in english_name_ \
or ' SE' in english_name_ or ' LPC' in english_name_ or 'S.P.A.' in english_name_:
english_name = english_name_.replace('INC.', '').replace('LTD.', '').replace('CO.', '').replace('CORP.', '') \
.replace('GMBH', '').replace(' AG', '').replace('SARL', '').replace('S.A.', '').replace('PTY', '') \
.replace('LLC', '').replace('LLP', '').replace(' AB', '').replace(' NV', '').replace(',', '') \
.replace('A.S.', '').replace(' SA', '').replace(',Limited', '').replace(' SE', '').replace(' PLC', '') \
.replace('N.V.', '').replace('S.P.A.', '').rstrip()
return english_name
else:
english_name = english_name_
return english_name
if __name__ == '__main__':
while True:
start_time = time.time()
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('Zhuanli:gwSocial_code')
# social_code = '9111000071093123XX'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
# time.sleep(20)
break
start = time.time()
try:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
# 数据重新塞入redis
baseCore.rePutIntoR('Zhuanli:gwSocial_code', social_code)
continue
id = data[0]
com_name = data[1]
xydm = data[2]
english_name_ = data[5]
place = data[6]
if place == 1:
log.info(f'{com_name}--国内')
baseCore.rePutIntoR('ZhuanLi:gnshSocial_code',social_code)
continue
if english_name_:
pass
else:
query = f"select * from sys_base_enterprise where social_credit_code ='{xydm}'"
cursor_.execute(query)
reslut = cursor_.fetchone()
english_name_ = reslut[32]
# todo:将该字段更新到144企业库
update_ = f"update EnterpriseInfo set EnglishName='{english_name_}' where SocialCode='{xydm}' "
cursor.execute(update_)
cnx.commit()
english_name_ = english_name_.upper()
english_name = name_handle(english_name_)
num_zhuanli = 0
# url1 = f'https://worldwide.espacenet.com/data/searchResults?ST=singleline&locale=cn_EP&submitted=true&DB=&query={com_name}&rnd=' + str(
# int(float(time.time()) * 1000))
#
# res1 = requests.get(url1, headers=headers)
# soup1 = BeautifulSoup(res1.content, 'html.parser')
#
# num_text = soup1.find('p', {'class': 'numResultsFoundMsg'}).text
#
# try:
# zhuanli = re.findall("约(.*?)个", num_text)[0].replace(',', '')
# except:
# zhuanli = re.findall("多于(.*?)个", num_text)[0].replace(',', '')
# if zhuanli:
for year in range(2023, 1900, -1):
url = f'https://worldwide.espacenet.com/data/searchResults?submitted=true&locale=cn_EP&DB=EPODOC&ST=advanced&TI=&AB=&PN=&AP=&PR=&PD={year}&PA={english_name}&IN=&CPC=&IC=&rnd=' + str(
int(float(time.time()) * 1000))
# url = 'https://worldwide.espacenet.com/data/searchResults?submitted=true&locale=cn_EP&DB=EPODOC&ST=advanced&TI=&AB=&PN=&AP=&PR=&PD=2022&PA=APPLE&IN=&CPC=&IC=&rnd=1703643229331'
ip = baseCore.get_proxy()
res = requests.get(url, headers=headers, proxies=ip)
if res.status_code == 200:
log.info('使用代理')
pass
else:
try:
ip = baseCore.get_proxy()
res = requests.get(url, headers=headers, proxies=ip)
if res.status_code == 200:
log.info('使用代理')
else:
res = requests.get(url, headers=headers)
if res.status_code == 200:
log.info('未使用代理')
except:
res = requests.get(url, headers=headers)
log.info('未使用代理')
soup = BeautifulSoup(res.content, 'html.parser')
num_text = soup.find('p', {'class': 'numResultsFoundMsg'}).text
try:
try:
zhuanli = int(re.findall("约(.*?)个", num_text)[0].replace(',', ''))
except:
zhuanli = int(re.findall("多于(.*?)个", num_text)[0].replace(',', ''))
except:
zhuanli = int(re.findall("找到(.*?)个", num_text)[0].replace(',', ''))
if zhuanli == 0:
dic_info = {
'com_name': com_name,
'social_code': social_code,
}
# 插入数据库表中
selectSql = f"select count(1) from zhuanli_500 where social_code='{xydm}' "
cursor.execute(selectSql)
count = cursor.fetchone()[0]
if count > 0:
log.info(f"{com_name}-----已经存在--{year}--无专利信息")
break
else:
values_tuple = tuple(dic_info.values())
# log.info(f"{gpdm}-------{companyname}---新增")
insertSql = f"insert into zhuanli_500(com_name,social_code) values (%s,%s)"
cursor.execute(insertSql, values_tuple)
cnx.commit()
log.info(f"{com_name}------新增----无专利信息")
break
dic_info = {
'com_name': com_name,
'social_code': social_code,
'year': year,
'num': zhuanli
}
# 插入数据库表中
selectSql = f"select count(1) from zhuanli_500 where social_code='{xydm}' and year='{year}' "
cursor.execute(selectSql)
count = cursor.fetchone()[0]
if count > 0:
log.info(f"{com_name}-------{year}---已经存在")
continue
else:
values_tuple = tuple(dic_info.values())
# log.info(f"{gpdm}-------{companyname}---新增")
insertSql = f"insert into zhuanli_500(com_name,social_code,year,num) values (%s,%s,%s,%s)"
cursor.execute(insertSql, values_tuple)
cnx.commit()
log.info(f"{com_name}-------{year}---新增")
except Exception as e:
log.info("error!{}".format(social_code))
log.info(e)
baseCore.rePutIntoR('Zhuanli:gwSocial_code', social_code)
log.info('已重新塞入redis')
continue
\ No newline at end of file
......@@ -2,7 +2,7 @@ from urllib.parse import urljoin
import langid
import pymysql
from gne import GeneralNewsExtractor
from retry import retry
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
......@@ -65,8 +65,7 @@ class GoogleSpider(object):
self.wordsCode = wordsCode
self.sid = sid
self.threadId = threadId
# self.item = item
# self.bangdan_name = bangdan_name
self.rMonitor = redis.Redis(host='114.116.90.53', port=6380, password='RPHZgkDQ4zGJ', db=15)
def createDriver(self):
chrome_driver = self.config.get('selenium', 'chrome_driver')
......@@ -82,17 +81,30 @@ class GoogleSpider(object):
def itemInsertToTable(self, items):
itemdata = []
conx, cursorM = self.connMysql()
companyinfo = self.item
social_code = str(companyinfo.split('|')[0])
ch_name = companyinfo.split('|')[1]
en_name = companyinfo.split('|')[2]
rank = self.bangdan_name + '|' + str(companyinfo.split('|')[3])
for item in items:
nowtime = self.getNowDate()
data = (social_code, en_name, ch_name, rank, item['title'], item['content'], item['detailurl'], item['publishtime'], item['source'], nowtime)
data = (self.sid, self.wordsCode, item['title'], item['detailurl'], item['source'], item['publishtime'],
item['content'], item['contentHtml'], '1', item['kword'], nowtime)
itemdata.append(data)
sql = "INSERT into Company_layoff (企业信用代码,企业英文名称,企业中文名称,所在榜单排名,标题,内容,链接,发布时间,来源,创建时间) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s)"
sql = "INSERT into google_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
self.closeSql(conx, cursorM)
def listInsertToTable(self, item):
itemdata = []
conx, cursorM = self.connMysql()
# for item in items:
nowtime = self.getNowDate()
data = (self.sid, self.wordsCode, item['title'], item['detailUrl'], item['sourceTag'], item['publishTag'], '0',
item['kword'], nowtime)
itemdata.append(data)
sql = "INSERT into google_search_list (sid,wordsCode,title,detailurl,origin,publishdate,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句
......@@ -205,7 +217,6 @@ class GoogleSpider(object):
raw_html = self.webDriver(url)
sm = SmartExtractor(lang)
article = sm.extract_by_html(raw_html, title)
# article = sm.extract_by_html(raw_html)
content = article.cleaned_text
contentWithTag = article.text
except Exception as e:
......@@ -308,7 +319,8 @@ class GoogleSpider(object):
self.driver.find_element('xpath', '//div[@id="hdtb-tls"]').click()
time.sleep(2)
# self.driver.find_element('xpath', '//div[@class="hdtb-mn-hd"]/div[text()="按相关性排序"]').click()
self.driver.find_element('xpath', '//*[@id="tn_1"]/span[3]/g-popup/div[1]/div/div/div[text()="按相关性排序"]').click()
self.driver.find_element('xpath',
'//*[@id="tn_1"]/span[3]/g-popup/div[1]/div/div/div[text()="按相关性排序"]').click()
time.sleep(2)
# self.driver.find_element('xpath', '//div[@class="YpcDnf OSrXXb HG1dvd"]/a[text()="按日期排序"]').click()
self.driver.find_element('xpath', '//*[@id="lb"]/div/g-menu/g-menu-item[2]/div/a[text()="按日期排序"]').click()
......@@ -323,6 +335,19 @@ class GoogleSpider(object):
repeatCounts = 0
for detail in lists:
durl = detail['detailUrl']
# publishTag=detail['publishTag']
# if publishTag:
# pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
# needDate='2024-09-30 00:00:00'
# needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
# needMaxDate = '2024-10-21 00:00:00'
# needMaxTime = datetime.datetime.strptime(needMaxDate, "%Y-%m-%d %H:%M:%S")
# if pubtime > needMaxTime:
# continue
#
# if pubtime < needTime:
# timeFlag = True
# break
is_member = self.r.sismember('pygoogle_' + self.wordsCode, durl)
if is_member:
repeatCounts += 1
......@@ -330,7 +355,11 @@ class GoogleSpider(object):
self.logger.info(f"{self.searchkw}首页已存在50%以上,结束抓取")
return
continue
self.detailList.put(detail)
# self.detailList.put(detail)
try:
self.listInsertToTable(detail)
except Exception as e:
self.logger.info(f"Error: {str(e)}")
response = self.driver.page_source
html = etree.HTML(response)
......@@ -341,8 +370,8 @@ class GoogleSpider(object):
hasnext = ''
timeFlag = False
while hasnext == '下一页':
# if self.page_num == 5:
# break
if self.page_num == 5:
break
self.page_num = self.page_num + 1
self.logger.info(f"{self.searchkw}...开始抓取第{self.page_num}页...")
try:
......@@ -354,6 +383,7 @@ class GoogleSpider(object):
repeated_counts = 0
for detail in lists:
durl = detail['detailUrl']
detail['kword'] = self.searchkw
is_member = self.r.sismember('pygoogle_' + self.wordsCode, durl)
if is_member:
self.logger.info(f"{self.searchkw}已存在{detail['title']}")
......@@ -363,14 +393,24 @@ class GoogleSpider(object):
return
continue
publishTag = detail['publishTag']
if publishTag:
pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
needDate = '2022-01-01 00:00:00'
needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
if pubtime < needTime:
timeFlag = True
break
self.detailList.put(detail)
# if publishTag:
# pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
# needDate='2024-09-30 00:00:00'
# needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
# needMaxDate = '2024-10-21 00:00:00'
# needMaxTime = datetime.datetime.strptime(needMaxDate, "%Y-%m-%d %H:%M:%S")
# if pubtime > needMaxTime:
# continue
#
# if pubtime < needTime:
# timeFlag = True
# break
# self.detailList.put(detail)
# todo: 先放入redis
try:
self.listInsertToTable(detail)
except Exception as e:
self.logger.info(f"Error: {str(e)}")
if timeFlag:
break
try:
......@@ -439,7 +479,7 @@ class GoogleSpider(object):
monitorDic = {
'lifecycle_data_crawler': monitor
}
self.baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
self.rMonitor.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
self.logger.info('数据监控发送Kafka失败,已放置Redis中')
# 获取详情页
......@@ -481,11 +521,10 @@ class GoogleSpider(object):
continue
processitem = self.getProcessitem(bdetail)
# uniqueCode = self.baseCore.getUniqueCode('GG', '195', self.threadId)
# processitem['uniqueCode'] = uniqueCode
uniqueCode = self.baseCore.getUniqueCode('GG', '195', self.threadId)
processitem['uniqueCode'] = uniqueCode
try:
# flg = self.sendkafka(processitem)
flg = True
flg = self.sendkafka(processitem)
if flg:
self.r.sadd('pygoogle_' + self.wordsCode, processitem['sourceAddress'])
# 插入数据库
......@@ -494,11 +533,11 @@ class GoogleSpider(object):
items.append(bdetail)
self.itemInsertToTable(items)
except Exception as e:
self.logger.info(f"插入数据库失败!{bdetail['kword']}===={e}")
# self.logger.info(f"放入kafka成功!{bdetail['kword']}===={detailUrl}")
# self.sendMonitor(processitem)
self.logger.info(f"插入数据库失败!{bdetail['kword']}===={detailUrl}")
self.logger.info(f"放入kafka成功!{bdetail['kword']}===={detailUrl}")
self.sendMonitor(processitem)
except Exception as e:
self.logger.info(f"{e}{bdetail['kword']}===={detailUrl}")
self.logger.info(f"放入kafka失败!{bdetail['kword']}===={detailUrl}")
# 关闭当前新窗口
# self.driver.close()
......
......@@ -167,7 +167,7 @@ if __name__ == '__main__':
print('---------------')
while True:
try:
codeids = ['KW-20220113-0004']
codeids = ['KW-20241021-0003']
for codeid in codeids:
try:
# keymsg=baiduTaskJob.getkafka()
......
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Awesome-pyecharts</title>
<script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/echarts.min.js"></script>
</head>
<body >
<div id="9088f2762ffc48a5b9bc2d33ca7c777c" class="chart-container" style="width:900px; height:500px; "></div>
<script>
var chart_9088f2762ffc48a5b9bc2d33ca7c777c = echarts.init(
document.getElementById('9088f2762ffc48a5b9bc2d33ca7c777c'), 'white', {renderer: 'canvas'});
var option_9088f2762ffc48a5b9bc2d33ca7c777c = {
"animation": true,
"animationThreshold": 2000,
"animationDuration": 1000,
"animationEasing": "cubicOut",
"animationDelay": 0,
"animationDurationUpdate": 300,
"animationEasingUpdate": "cubicOut",
"animationDelayUpdate": 0,
"aria": {
"enabled": false
},
"color": [
"#5470c6",
"#91cc75",
"#fac858",
"#ee6666",
"#73c0de",
"#3ba272",
"#fc8452",
"#9a60b4",
"#ea7ccc"
],
"series": [
{
"type": "bar",
"name": "\u5546\u5bb6A",
"legendHoverLink": true,
"data": [
5,
20,
36,
10,
75,
85
],
"realtimeSort": false,
"showBackground": false,
"stackStrategy": "samesign",
"cursor": "pointer",
"barMinHeight": 0,
"barCategoryGap": "20%",
"barGap": "30%",
"large": false,
"largeThreshold": 400,
"seriesLayoutBy": "column",
"datasetIndex": 0,
"clip": true,
"zlevel": 0,
"z": 2,
"label": {
"show": true,
"margin": 8
}
}
],
"legend": [
{
"data": [
"\u5546\u5bb6A"
],
"selected": {}
}
],
"tooltip": {
"show": true,
"trigger": "item",
"triggerOn": "mousemove|click",
"axisPointer": {
"type": "line"
},
"showContent": true,
"alwaysShowContent": false,
"showDelay": 0,
"hideDelay": 100,
"enterable": false,
"confine": false,
"appendToBody": false,
"transitionDuration": 0.4,
"textStyle": {
"fontSize": 14
},
"borderWidth": 0,
"padding": 5,
"order": "seriesAsc"
},
"xAxis": [
{
"show": true,
"scale": false,
"nameLocation": "end",
"nameGap": 15,
"gridIndex": 0,
"inverse": false,
"offset": 0,
"splitNumber": 5,
"minInterval": 0,
"splitLine": {
"show": true,
"lineStyle": {
"show": true,
"width": 1,
"opacity": 1,
"curveness": 0,
"type": "solid"
}
},
"data": [
"\u886c\u886b",
"\u7f8a\u6bdb\u886b",
"\u96ea\u7eba\u886b",
"\u88e4\u5b50",
"\u9ad8\u8ddf\u978b",
"\u889c\u5b50"
]
}
],
"yAxis": [
{
"show": true,
"scale": false,
"nameLocation": "end",
"nameGap": 15,
"gridIndex": 0,
"inverse": false,
"offset": 0,
"splitNumber": 5,
"minInterval": 0,
"splitLine": {
"show": true,
"lineStyle": {
"show": true,
"width": 1,
"opacity": 1,
"curveness": 0,
"type": "solid"
}
}
}
]
};
chart_9088f2762ffc48a5b9bc2d33ca7c777c.setOption(option_9088f2762ffc48a5b9bc2d33ca7c777c);
</script>
</body>
</html>
test.png

78.9 KB

import json
import json
import re
import requests
import time
from zhipuai import ZhipuAI
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
def pre_zhipuai_http(content_list, prompt_abstract):
url = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
payload = json.dumps({
"model": "glm-4-flash",
"messages": [
{
"role": "user",
"content": prompt_abstract.replace('{content_list}', str(content_list))
}
]
})
headers = {
'Authorization': 'Bearer 6c8a99bde51835522a2af62ea71e6c0a.iOAOC3vaLYua2Rgr',
'Content-Type': 'application/json',
# 'Cookie': 'acw_tc=2760822c17247248406261468e6c541507ba9035f95078363549469047ee74'
}
# response = requests.request("POST", url, headers=headers, data=payload)
response = requests.post(url, headers=headers, json=payload, verify=True)
response = response.json()
return response['choices'][0]['message']['content']
def pre_zhipuai(content, prompt_abstract):
# zhipu
client = ZhipuAI(
api_key="6c8a99bde51835522a2af62ea71e6c0a.iOAOC3vaLYua2Rgr") # 填写您自己的APIKey
response = client.chat.completions.create(
model="glm-4-flash", # 填写需要调用的模型名称
messages=[
{"role": "user",
"content": prompt_abstract.replace('{content}', str(content))
}
],
)
llm_response = response.choices[0].message.content
return llm_response
def get_data(excel_file_path):
import pandas as pd
try:
df = pd.read_excel(excel_file_path).astype(str)
dict_from_df = df.to_dict(
orient='records') # 将DataFrame转换成字典,这里假设每列的列名作为字典的键,列中的数据作为值,'records'参数将DataFrame的每一行转换为一个字典
data_list = []
for row in dict_from_df:
data_list.append(row)
return data_list
except Exception as e:
log.info(f"读取excel文件时出错:{e}")
return []
def clean_text(content):
# content = BeautifulSoup(content, "html.parser").text
content_ = content.replace('\n', '')
content_result = re.sub(r'\s+', ' ', content_)
# print(content_result)
return content_result
def gml2(area, content):
extract_prompt = "请阅读下文,概括总结有关{area}的问题挑战有哪些?不可回答文中没有出现的内容,若无{area}相关问题,请回答“文中未提及”。\n################\n{context}"
length_threshold = 2089
ptuning_url = "http://116.63.179.212:7861/local_doc_qa/async_chat"
llm_payload = json.dumps({
"question": extract_prompt.replace("{context}", content).replace("{area}", area)[:length_threshold],
"history": [],
"llm_params": {
"max_length": 2089,
"top_p": 0.7,
"temperature": 0.1
}
})
temp_problem_ = ""
max_retry = 2
retry_count = 0
while retry_count < max_retry:
try:
headers = {'Content-Type': 'application/json'}
ptuning_response = requests.post(ptuning_url, headers=headers, data=llm_payload, timeout=50)
temp_response = ptuning_response.text
temp_resp_json = json.loads(temp_response)
temp_problem_ = temp_resp_json["response"]
break
except Exception as e:
log.info(f"当前处理异常的内容为:{extract_prompt[:100]}")
log.info(e)
time.sleep(10) # 等待10秒后再次请求
retry_count += 1
continue
return temp_problem_
def write_excel(write_file_path, result_dic_list):
import os
from openpyxl import Workbook, load_workbook
dicts = result_dic_list
if dicts: # 检查 dicts 是否不为空
if os.path.exists(write_file_path):
wb = load_workbook(write_file_path)
ws = wb.active
else:
wb = Workbook()
ws = wb.active
keyword = dicts[0]
ws.append(list(keyword.keys()))
for dict_item in dicts:
if isinstance(dict_item, dict):
ws.append(list(dict_item.values()))
else:
ws.append(list([]))
wb.save(write_file_path)
else:
log.info("警告:dicts 列表为空")
if __name__ == "__main__":
start = time.time()
excel_file_path = "D:\kkwork\知识库服务\data\企业家精神-数据库汇总_11765_近三年.xlsx"
write_file_path = "D:\kkwork\知识库服务\data\企业家精神-数据库汇总_11765_近三年_result.xlsx"
data_list = get_data(excel_file_path)
new_list = []
area = "企业家精神"
for i in data_list:
content = i['正文']
content = clean_text(content)
response = gml2(area, content)
i["问题"] = response
log.info(response)
new_list.append(i)
write_excel(write_file_path, new_list)
# response['choices'][0]['message']['content']
end = time.time()
log.info(end - start)
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import datetime
import os
import random
import sys
......@@ -216,11 +217,16 @@ class BaseCore:
except :
pass
def __init__(self):
# 连接到Redis
self.r = redis.Redis(host="114.116.90.53", port=6380, password='RPHZgkDQ4zGJ', db=6)
self.__cnx_proxy = pymysql.connect(host='1.95.78.131', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
self.r = redis.Redis(host='114.116.90.53', port=6380, password='RPHZgkDQ4zGJ', db=6)
self.cnx = pymysql.connect(host='1.95.78.131', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4')
self.cursor = self.cnx.cursor()
# 11数据库
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
self.cursor_ = self.cnx_.cursor()
pass
# 计算耗时
......@@ -291,11 +297,62 @@ class BaseCore:
def getRandomUserAgent(self):
return random.choice(self.__USER_AGENT_LIST)
# 从Redis的List中获取并移除一个元素
def get_proxyIPPort(self):
ip_list = []
with self.__cursor_proxy as cursor:
sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12))
print(sql_str)
cursor.execute(sql_str)
rows = cursor.fetchall()
for row in tqdm(rows):
str_ip = row[0]
str_ip_list = str_ip.split('-')
proxy = {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
ip_list.append(proxy)
return ip_list
# 从Redis的List中获取并移除一个元素
def redicPullData(self, key):
try:
self.r.ping()
except:
self.r = redis.Redis(host="114.116.90.53", port=6380, password='RPHZgkDQ4zGJ', db=6)
self.r = redis.Redis(host="114.115.236.206", port=6379, password='RPHZgkDQ4zGJ', db=6)
item = self.r.lpop(key)
return item.decode() if item else None
\ No newline at end of file
return item.decode() if item else None
def getSidName(self, sid):
sqlSelect = f"SELECT words_name FROM `key_words` WHERE id = '{sid}'"
self.cursor_.execute(sqlSelect)
data = self.cursor_.fetchone()[0]
return data
# 获得脚本进程PID
def getPID(self):
PID = os.getpid()
return PID
def getUniqueCode(self, abbr, serverId, threadId):
while True:
timeCode = self.r.blpop(['timeCode'], 2)
if timeCode:
timeCode = timeCode[1]
timeCode = timeCode.decode('utf-8')
break
else:
time.sleep(2)
pid = str(self.getPID())
if len(pid) < 4:
pid = pid.zfill(4)
elif len(pid) > 4:
pid = pid[0:4]
threadId = str(threadId)
if len(threadId) > 1:
threadId = threadId[0]
uniqueCode = abbr + str(datetime.datetime.now().strftime('%Y%m%d'))[2:] + serverId + pid + str(threadId) + str(timeCode)
return uniqueCode
\ No newline at end of file
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import datetime
import json
import time
import pandas as pd
import pymongo
import redis
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from baiduSpider import BaiduSpider
searchkw, wordsCode, sid = '', '', ''
baidu = BaiduSpider(searchkw, wordsCode, sid)
import urllib3
# r = redis.Redis(host="114.116.90.53", port=6380, password='RPHZgkDQ4zGJ', db=6)
# db_storage = pymongo.MongoClient('mongodb://1.95.69.135:27017/', username='admin', password='ZZsn@9988').ZZSN[
# '天眼查登录信息']
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# import sys
# sys.path.append('D:\\PycharmProjects\\zzsn\\base')
from baseCore import BaseCore
baseCore = BaseCore()
# 131
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# 11
cnx = baseCore.cnx_
cursor = baseCore.cursor_
log = baseCore.getLogger()
r = baseCore.r
def selectSql():
sql = """select * from google_search_list where state=0"""
cursor_.execute(sql)
return cursor_.fetchall()
def getNowDate():
# 获取当前时间
current_time = datetime.datetime.now()
# 将时间转换为字符串
currentdate = current_time.strftime("%Y-%m-%d %H:%M:%S")
return currentdate
def getProcessitem(bdetail):
nowDate = getNowDate()
if content != '':
processitem = {
"sid": bdetail['sid'],
"source": "4",
"title": bdetail['title'],
"content": bdetail['content'],
"contentWithtag": bdetail['contentHtml'],
"origin": bdetail['origin'],
"publishDate": bdetail['publishDate'],
"sourceAddress": bdetail['detailurl'],
"createDate": nowDate
}
else:
processitem = {}
return processitem
def itemInsertToTable(item):
itemdata = []
nowtime = getNowDate()
data = (item['content'], item['contentHtml'], '1', nowtime, item['id'])
itemdata.append(data)
sql = "UPDATE google_search_list SET content=%s, content_with_tag=%s, state=%s, create_time=%s WHERE id=%s"
cursor_.executemany(sql, itemdata)
log.info("数据更新数据库成功!")
cnx_.commit()
def sendkafka(processitem):
producer = KafkaProducer(bootstrap_servers=['1.95.78.131:9092'])
try:
content = processitem['content']
publishDate = str(processitem['publishDate'])
title = processitem['title']
if title == '':
return
if content == '':
return
if publishDate == '':
return
kafka_result = producer.send("crawlerInfo", json.dumps(processitem, ensure_ascii=False).encode('utf8'))
# self.logger.info("数据发送kafka成功")
log.info(kafka_result.get(timeout=10))
flg = True
except Exception as e:
flg = False
pass
# self.logger.info('发送kafka异常')
finally:
producer.close()
return flg
def sendMonitor(processitem):
log.info(processitem['uniqueCode'])
sidName = baseCore.getSidName(processitem['sid'])
monitor = {
"title": processitem['title'], # 标题
"sourceAddress": processitem['sourceAddress'], # 原文链接
"uniqueCode": processitem['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": processitem['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 4, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['1.95.78.131:9092'], max_request_size=1024 * 1024 * 20,
api_version=(2, 7, 0))
try:
kafka_result = producer.send("crawlerInfo", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
# self.rMonitor.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败')
if __name__ == "__main__":
while True:
resultList = selectSql()
df_list = []
for result in resultList:
id_ = result[0]
title = result[1]
publishDate = result[2]
origin = result[3]
url = result[4]
sid = result[10]
wordsCode = result[11]
keyword = result[6]
try:
content, contentWithTag, title = baidu.extractorMsg(url, title)
contentWithTag = baidu.rmTagattr(contentWithTag, url)
except Exception as e:
content = ''
contentWithTag = ''
if len(content) < 100:
continue
soup = BeautifulSoup(contentWithTag, "html.parser")
# 查找所有带有class属性的元素
elements_with_class = soup.find_all(class_=True)
# 循环遍历元素并去掉class属性
for element in elements_with_class:
del element.attrs["class"]
contentHtml = str(soup)
detailmsg = {
'id': id_,
'sid': sid,
'title': title,
'detailurl': url,
'content': content,
'contentHtml': contentHtml,
'origin': origin,
'publishDate': publishDate
}
processitem = getProcessitem(detailmsg)
uniqueCode = baseCore.getUniqueCode('GG', '195', '1')
processitem['uniqueCode'] = uniqueCode
try:
flg = sendkafka(processitem)
if flg:
r.sadd('pygoogle_' + wordsCode, processitem['sourceAddress'])
# 插入数据库
try:
itemInsertToTable(detailmsg)
except Exception as e:
log.info(f"插入数据库失败!{keyword}===={url}")
log.info(f"放入kafka成功!{keyword}===={url}")
sendMonitor(processitem)
except Exception as e:
log.info(f"放入kafka失败!{keyword}===={url}")
# df_list.append(detailmsg)
# df = pd.DataFrame(df_list)
# df.to_excel("./测试结果.xlsx", index=False)
time.sleep(1)
\ No newline at end of file
from baiduSpider import BaiduSpider
from baiduSpider import BaiduSpider
......@@ -5,8 +5,8 @@ import requests
# url = 'https://baijiahao.baidu.com/s?id=1784907851792547880&wfr=spider&for=pc'
# url = 'https://www.thepaper.cn/newsDetail_forward_26661172'
url = 'https://finance.huanqiu.com/article/9CaKrnK5O7o' # 澎湃新闻 虎嗅APP 经济观察网
title = '中国建材集团董事长宋志平:激发和保护企业家精神'
url = 'https://www.ctnews.com.cn/huanqiu/content/2024-10/08/content_165713.html' # 澎湃新闻 虎嗅APP 经济观察网
title = '中国味道——大运河美食体验工作坊亮相尼日利亚'
try:
detailurl = url
title = title
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论