提交 a512c71f 作者: 刘伟刚

修改代码提交

上级 c52051f8
import configparser
import os
import glob
#统计数据的路径信息
def getTotalFileNamePath(path,year,startMonth,endMonth,field_name,iEType_name):
end_str = "{:02d}".format(endMonth)
if startMonth<endMonth:
filePathname=path+str(year)+'\\'+end_str+'\\累计\\'
# 202301-07--收发货地址--进口.csv
fileName=str(year)+'01-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
if startMonth==endMonth:
filePathname=path+str(year)+'\\'+end_str+'\\单月\\'
fileName=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
fpathName = os.path.join(filePathname, fileName)
return fpathName
#统计数据的详细数据信息
def getFileNamePath(path,year,startMonth,endMonth,field_name,iEType_name):
end_str = "{:02d}".format(endMonth)
if startMonth<endMonth:
filePathname=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
# 202301-07--收发货地址--出口-11.csv
fileName=str(year)+'01-'+end_str+'--'+field_name+'--'+iEType_name+'-*.csv'
if startMonth==endMonth:
filePathname=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
# 202307--收发货地址--出口-11.csv
fileName=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-*.csv'
inFileName = os.path.join(filePathname, fileName)
return inFileName
import pandas as pd
def readTotalCsv(path):
df = pd.read_csv(path, encoding='gbk',dtype=str)
df['美元'] = df['美元'].str.replace(',', '').astype(float)
return df
def readDetailCsv(path):
ddf = pd.read_csv(path, encoding='gbk',dtype=str)
# 计算列的总和
column_sum = pd.to_numeric(ddf['美元'].str.replace(',', '').astype(float)).sum()
# row =pd.loc[pd[field_name+'编码']==codeId]
codeId=ddf.iloc[0, 0]
return codeId,column_sum
def test():
path='D:\hg\\'
years='2022'
endMonths='12'
endMonths=int(endMonths)
startMonths=[1,endMonths]
field_names=['贸易伙伴','收发货地址','贸易方式']
iEType_names=['进口','进出口','出口']
errorFile=[]
conError=[]
for startMonth in startMonths:
for field_name in field_names:
for iEType_name in iEType_names:
fpathName=getTotalFileNamePath(path,years,startMonth,endMonths,field_name,iEType_name)
pdAll=readTotalCsv(fpathName)
inFileName=getFileNamePath(path,years,startMonth,endMonths,field_name,iEType_name)
listDetailFile = glob.glob(inFileName, recursive=True)
for detailFile in listDetailFile:
try:
codeId,column_sum=readDetailCsv(detailFile)
try:
if field_name=='收发货地址':
row =pdAll.loc[pdAll['注册地编码']==codeId]
else:
row =pdAll.loc[pdAll[field_name+'编码']==codeId]
try:
usvalue = row.at[row.index[-1], '美元']
if usvalue==column_sum:
continue
else:
msgList=[detailFile,column_sum,usvalue]
errorFile.append(msgList)
except Exception as e:
print(f'错误原因e{e} {detailFile}')
except Exception as e2 :
print(f'错误原因e2{e2} {detailFile}')
except Exception as e3 :
# 下载的异常文件
conError.append(detailFile)
print(f'错误原因e2{e3} {detailFile}')
# 将列表转换为DataFrame
df = pd.DataFrame(errorFile, columns=['有问题的文件','计算的值','网站的值'])
# 将DataFrame保存为CSV文件
df.to_csv(years+str(endMonths)+'data.csv', index=False)
# 将列表转换为DataFrame
df = pd.DataFrame(conError, columns=['下载内容异常'])
# 将DataFrame保存为CSV文件
df.to_csv(years+str(endMonths)+'文件错误.csv', index=False)
return errorFile
if __name__ == '__main__':
errorFileList=test()
# config = configparser.ConfigParser()
# config.read('config.ini')
# path=config.get('param', 'path')
# years=config.get('param', 'year')
# endMonths=config.get('param', 'endMonth')
# startMonths=['1',endMonths]
# field_names=['贸易伙伴','收发货地址','贸易方式']
# iEType_names=['进口','进出口','出口']
# for startMonth in startMonths:
# for field_name in field_names:
# for iEType_name in iEType_names:
# getFileNamePath(path,years,startMonth,endMonths,field_name,iEType_name)
...@@ -10,7 +10,6 @@ import pandas as pd ...@@ -10,7 +10,6 @@ import pandas as pd
import redis import redis
import requests import requests
from datetime import datetime from datetime import datetime
''' '''
海关下载数据类型和参数分类组合 海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址 CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
...@@ -44,6 +43,10 @@ d:/hg/2023/7/ ...@@ -44,6 +43,10 @@ d:/hg/2023/7/
1)按照类型分组获取对应的每月的最新编码信息 1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载 2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据 3)根据商品编码下载数据
6.添加文件内容格式校验
1)获取统计文件信息
2)获取下载的文件数据
3)计算对应的值,如果异常就删除
''' '''
class HgDownFile(object): class HgDownFile(object):
...@@ -212,11 +215,20 @@ class HgDownFile(object): ...@@ -212,11 +215,20 @@ class HgDownFile(object):
#单个字段的参数设置 #单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1): def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year==2022 and endMonth==1: # 2022年 1-1 202202 2 2022年 1月之前数据是 2
selectTableState= 1 #202201前的数据为2 后的数据是1 # 2022年 1-2 202202 3 2022年的累计数据是 3
# 2022年 2-2 202202 1 2022年 1月之后数据是 1
if year<2022:
selectTableState= 2 #202202前的数据为2 后的数据是1
else: else:
selectTableState= 2 #202201前的数据为2 后的数据是1 s=int(startMonth)
e=int(endMonth)
if year==2022 and s<e: #2022年累计数据单独设置参数
selectTableState= 3
elif year==2022 and e==1:
selectTableState= 2
else:
selectTableState= 1 #202202前的数据为2 后的数据是1
param={ param={
'pageSize': 10, 'pageSize': 10,
'iEType': iEType, 'iEType': iEType,
...@@ -238,17 +250,24 @@ class HgDownFile(object): ...@@ -238,17 +250,24 @@ class HgDownFile(object):
'outerValue4':'', 'outerValue4':'',
'orderType': 'CODE ASC DEFAULT', 'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1 'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201', 'currentStartTime': '202202',
} }
return param return param
#联合查询字段的参数设置 #联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode): def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year==2022 and endMonth==1:
selectTableState= 1 #202201前的数据为2 后的数据是1 if year<2022:
selectTableState= 2 #202202前的数据为2 后的数据是1
else: else:
selectTableState= 2 #202201前的数据为2 后的数据是1 s=int(startMonth)
e=int(endMonth)
if year==2022 and s<e: #2022年累计数据单独设置参数
selectTableState= 3
elif year==2022 and e==1:
selectTableState= 2
else:
selectTableState= 1 #202202前的数据为2 后的数据是1
param={ param={
'pageSize': 10, 'pageSize': 10,
'iEType': iEType, 'iEType': iEType,
...@@ -270,7 +289,7 @@ class HgDownFile(object): ...@@ -270,7 +289,7 @@ class HgDownFile(object):
'outerValue4':'', 'outerValue4':'',
'orderType': 'CODE ASC DEFAULT', 'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, 'selectTableState': selectTableState,
'currentStartTime': '202201', 'currentStartTime': '202202',
} }
return param return param
...@@ -314,12 +333,11 @@ class HgDownFile(object): ...@@ -314,12 +333,11 @@ class HgDownFile(object):
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT'] outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS'] # outerFields=['CODE_TS']
currencyType='usd' currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8')) if endMonth==1:
# endMonth=int(self.config.get('param', 'endMonth')) startMonths=[1]
# if endMonth != (month-1): else:
# return startMonths=[1,endMonth]
startMonths=[endMonth]
for startMonth in startMonths: for startMonth in startMonths:
for iEType in iETypes: for iEType in iETypes:
for outerField1 in outerFields: for outerField1 in outerFields:
...@@ -346,19 +364,59 @@ class HgDownFile(object): ...@@ -346,19 +364,59 @@ class HgDownFile(object):
print(f'文件已存在{filePathName}') print(f'文件已存在{filePathName}')
codeFileList.append(filePathName) codeFileList.append(filePathName)
continue continue
#进行数据下载返回临时文件
tmpfilename=self.reqDownFile(param) tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName) #校验临时的金额是否跟统计文件中的对应
print(saveFileName) flagg=self.verifyFile(tmpfilename,year,startMonth,endMonth,outerField1,iEType,currencyType)
codeFileList.append(saveFileName) #将临时文件的数据复制到指定文件中
if flagg:
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
else:
os.remove(tmpfilename)
return codeFileList return codeFileList
def verifyFile(self,tmpfilename,year,startMonth,endMonths,outerField1,iEType,currencyType):
flag=False
path='D:\\hg\\'
years=year
endMonths=endMonths
end_str=int(endMonths)
startMonths=startMonth
if startMonths<=end_str:
filePathName=self.filepath(iEType,currencyType,year,startMonth,end_str,outerField1)
try:
dfAll = pd.read_csv(filePathName, encoding='gbk',dtype=str)
dfAll['美元'] = dfAll['美元'].str.replace(',', '').astype(float)
ddf = pd.read_csv(tmpfilename, encoding='gbk',dtype=str)
column_sum = pd.to_numeric(ddf['美元'].str.replace(',', '').astype(float)).sum()
codeId=ddf.iloc[0, 0]
fieldNm=''
if 'ORIGIN_COUNTRY' in outerField1:
fieldNm='贸易伙伴'
if 'TRADE_MODE' in outerField1:
fieldNm='贸易方式'
if 'TRADE_CO_PORT' in outerField1:
fieldNm='收发货地址'
if fieldNm=='收发货地址':
row =dfAll.loc[dfAll['注册地编码']==codeId]
else:
row =dfAll.loc[dfAll[fieldNm+'编码']==codeId]
try:
usvalue = row.at[row.index[-1], '美元']
if usvalue==column_sum:
flag=True
except Exception as e:
print(e)
except Exception as e22:
print(e22)
return flag
def codeFieldDown(self,fieldFileList,year,endMonth): def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now() current_date = datetime.now()
# year = current_date.year # year = current_date.year
year = int(year) year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList: for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT'] #['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try: try:
...@@ -411,7 +469,11 @@ if __name__ == '__main__': ...@@ -411,7 +469,11 @@ if __name__ == '__main__':
fieldFileList=hgDownFile.field1Down(year,endMonth) fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12: if len(fieldFileList)>= 12:
break break
else:
while len(fieldFileList)< 24:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 24:
break
for i in range(1,3): for i in range(1,3):
print('_______________') print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth) hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
...@@ -11,6 +11,8 @@ import redis ...@@ -11,6 +11,8 @@ import redis
import requests import requests
from datetime import datetime from datetime import datetime
from logRecord import LogRecord from logRecord import LogRecord
import traceback
''' '''
海关商品详情下载流程 海关商品详情下载流程
1.下载商品编码 1.下载商品编码
...@@ -225,10 +227,18 @@ class HgDownFile(object): ...@@ -225,10 +227,18 @@ class HgDownFile(object):
#单个字段的参数设置 #单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1): def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year>2021: if year<2022:
selectTableState= 1 #202201前的数据为2 后的数据是1 selectTableState= 2 #202202前的数据为2 后的数据是1
else: else:
selectTableState= 2 #202201前的数据为2 后的数据是1 s=int(startMonth)
e=int(endMonth)
if year==2022 and s<e: #2022年累计数据单独设置参数
selectTableState= 3
elif year==2022 and e==1:
selectTableState= 2
else:
selectTableState= 1 #202202前的数据为2 后的数据是1
outerValue1='87036011,87036012,87036013,87036019,87036021,87036022,87036023,87036029,87036031,87036032,87036033,87036039,87036041,87036042,87036043,87036049,87036051,87036052,87036053,87036059,87036061,87036062,87036063,87036069,87036071,87036072,87036073,87036079,87037011,87037012,87037013,87037019,87037021,87037022,87037023,87037029,87037031,87037032,87037033,87037039,87037041,87037042,87037043,87037049,87037051,87037052,87037053,87037059,87037061,87037062,87037063,87037069,87037071,87037072,87037073,87037079,40111000,40112000,40121100,40121200,40122010,40129020,40131000,70071190,70072190,70091000,85229091,85269110,85272100,85272900,85392130,85392930,94019910,28046117,28046119,38180011,38180019,85044030,85414200,85414300,84723090,84729040,85258120,85258220,85258320,85258921,85258922,85258923,85258929,85286210,85286220,85286290,85286910,85286990,90065310,90065390,90065930,90065941,90065949'
param={ param={
'pageSize': 10, 'pageSize': 10,
'iEType': iEType, 'iEType': iEType,
...@@ -244,22 +254,29 @@ class HgDownFile(object): ...@@ -244,22 +254,29 @@ class HgDownFile(object):
'outerField2':'', 'outerField2':'',
'outerField3':'', 'outerField3':'',
'outerField4':'', 'outerField4':'',
'outerValue1':'', 'outerValue1':outerValue1,
'outerValue2':'', 'outerValue2':'',
'outerValue3':'', 'outerValue3':'',
'outerValue4':'', 'outerValue4':'',
'orderType': 'CODE ASC DEFAULT', 'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1 'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201', #2022年1月数据需要单独处理 'currentStartTime': '202202', #2022年1月数据需要单独处理
} }
return param return param
#联合查询字段的参数设置 #联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode): def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year>2021: if year<2022:
selectTableState= 1 #202201前的数据为2 后的数据是1 selectTableState= 2 #202202前的数据为2 后的数据是1
else: else:
selectTableState= 2 #202201前的数据为2 后的数据是1 s=int(startMonth)
e=int(endMonth)
if year==2022 and s<e: #2022年累计数据单独设置参数
selectTableState= 3
elif year==2022 and e==1:
selectTableState= 2
else:
selectTableState= 1 #202202前的数据为2 后的数据是1
param={ param={
'pageSize': 10, 'pageSize': 10,
'iEType': iEType, 'iEType': iEType,
...@@ -281,7 +298,7 @@ class HgDownFile(object): ...@@ -281,7 +298,7 @@ class HgDownFile(object):
'outerValue4':'', 'outerValue4':'',
'orderType': 'CODE ASC DEFAULT', 'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, 'selectTableState': selectTableState,
'currentStartTime': '202201', 'currentStartTime': '202202',
} }
return param return param
...@@ -302,6 +319,20 @@ class HgDownFile(object): ...@@ -302,6 +319,20 @@ class HgDownFile(object):
os.remove(tmpfilename) os.remove(tmpfilename)
return filePathName return filePathName
#将临时文件放复制到目录中
def tmpFileLength(self,tmpfilename):
flag=True
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9990:
print('csv文件行数过大需要对编码进行拆分')
flag=False
return flag
def readcsv(self,filePath): def readcsv(self,filePath):
codes=[] codes=[]
...@@ -369,8 +400,19 @@ class HgDownFile(object): ...@@ -369,8 +400,19 @@ class HgDownFile(object):
codeFileList.append(codeFileMsg) codeFileList.append(codeFileMsg)
continue continue
tmpfilename=self.reqDownFile(param) tmpfilename=self.reqDownFile(param)
#将下载的临时文件复制到规定的文件中 fflag=self.tmpFileLength(tmpfilename)
saveFileName=self.tmpToFile(tmpfilename,filePathName) #判断文件行数是否接近1万
if fflag: #小于1万保存数据
#校验临时的金额是否跟统计文件中的对应
flagg=self.verifyFile(tmpfilename,year,startMonth,endMonth,outerField1,iEType,currencyType)
#将临时文件的数据复制到指定文件中
if flagg:
#将下载的临时文件复制到规定的文件中
saveFileName=self.tmpToFile(tmpfilename,filePathName)
else:
saveFileName=''
else:
saveFileName=''
#文件行数超过接近1万时需要对编码进行拆分进行重新下载 #文件行数超过接近1万时需要对编码进行拆分进行重新下载
if saveFileName=='': if saveFileName=='':
cds=code.split(',') cds=code.split(',')
...@@ -380,6 +422,7 @@ class HgDownFile(object): ...@@ -380,6 +422,7 @@ class HgDownFile(object):
#拼接参数 #拼接参数
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code) param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
#生成参数对应的文件路径 #生成参数对应的文件路径
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,filecodes) filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,filecodes)
if os.path.exists(filePathName): if os.path.exists(filePathName):
print(f'文件已存在{filePathName}') print(f'文件已存在{filePathName}')
...@@ -390,14 +433,54 @@ class HgDownFile(object): ...@@ -390,14 +433,54 @@ class HgDownFile(object):
codeFileList.append(codeFileMsg) codeFileList.append(codeFileMsg)
continue continue
tmpfilename=self.reqDownFile(param) tmpfilename=self.reqDownFile(param)
#将下载的临时文件复制到规定的文件中 #校验临时的金额是否跟统计文件中的对应
saveFileName=self.tmpToFile(tmpfilename,filePathName) flagg=self.verifyFile(tmpfilename,year,startMonth,endMonth,outerField1,iEType,currencyType)
#将临时文件的数据复制到指定文件中
if flagg:
#将下载的临时文件复制到规定的文件中
saveFileName=self.tmpToFile(tmpfilename,filePathName)
# #将下载的临时文件复制到规定的文件中
# saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName) print(saveFileName)
codeFileList.append(saveFileName) codeFileList.append(saveFileName)
filemsg=self.codeFilepathMsg(iEType,currencyType,year,startMonth,endMonth,outerField1) filemsg=self.codeFilepathMsg(iEType,currencyType,year,startMonth,endMonth,outerField1)
return codeFileList,filemsg return codeFileList,filemsg
def verifyFile(self,tmpfilename,year,startMonth,endMonths,outerField1,iEType,currencyType):
flag=False
path='D:\\hg\\'
years=year
endMonths=endMonths
end_str=int(endMonths)
startMonths=startMonth
if startMonths<=end_str:
try:
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
except :
traceback.print_exc()
try:
dfAll = pd.read_csv(filePathName, encoding='gbk',dtype=str)
dfAll['美元'] = dfAll['美元'].str.replace(',', '').astype(float)
ddf = pd.read_csv(tmpfilename, encoding='gbk',dtype=str)
ddf['美元']=pd.to_numeric(ddf['美元'].str.replace(',', '').astype(float))
column_sum = ddf.groupby('商品编码')['美元'].sum()
sumList=column_sum.reset_index().values.tolist()
for codesum in sumList:
codeId=codesum[0]
cvalue=codesum[1]
row =dfAll.loc[dfAll['商品编码']==codeId]
try:
usvalue = row.at[row.index[-1], '美元']
if usvalue==cvalue:
flag=True
except Exception as e:
print(e)
except Exception as e22:
print(e22)
return flag
#详情商品信息参数拼接 #详情商品信息参数拼接
def codeFieldDown(self,fieldFileList,year,endMonth): def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now() current_date = datetime.now()
...@@ -427,7 +510,8 @@ class HgDownFile(object): ...@@ -427,7 +510,8 @@ class HgDownFile(object):
codeFileList,filemsg=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes) codeFileList,filemsg=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e: except Exception as e:
print(e) # print(e)
traceback.print_exc()
continue continue
return codeFileList,filemsg return codeFileList,filemsg
...@@ -486,6 +570,3 @@ if __name__ == '__main__': ...@@ -486,6 +570,3 @@ if __name__ == '__main__':
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import logging
import getpass
import sys
class MyLog(object):
def __init__(self):
"""这个类用于创建一个自用的log"""
user=getpass.getuser()
self.logger=logging.getLogger(user)
self.logger.setLevel(logging.DEBUG)
logFile=sys.argv[0][0:-3]+'.log' #日志文件名
formatter=logging.Formatter('%(asctime)-12s %(levelname)-8s %(name)-10s %(message)-12s')
"""日志显示到屏幕上并输出到日志文件夹内"""
logHand=logging.FileHandler(logFile)
logHand.setFormatter(formatter)
logHand.setLevel(logging.ERROR) #只有错误才会被记录到logfile中
logHandSt=logging.StreamHandler()
logHandSt.setFormatter(formatter)
self.logger.addHandler(logHand)
self.logger.addHandler(logHandSt)
"""日志的5个级别对应以下的5个函数"""
def debug(self,msg):
self.logger.debug(msg)
def info(self,msg):
self.logger.info(msg)
def warn(self,msg):
self.logger.warn(msg)
def error(self,msg):
self.logger.error(msg)
def critical(self,msg):
self.logger.critical(msg)
if __name__ == '__main__':
mylog=MyLog()
mylog.debug("我是一个debug")
mylog.info("我是一个info")
mylog.warn("我是一个warn")
mylog.error("我是一个error")
mylog.critical("我是一个critical")
...@@ -93,7 +93,7 @@ def paserList(searchmsg,social_code): ...@@ -93,7 +93,7 @@ def paserList(searchmsg,social_code):
storyPath='https://cn.tradingview.com'+item['storyPath'] storyPath='https://cn.tradingview.com'+item['storyPath']
published=item['published'] published=item['published']
published=getFormatedate(published) published=getFormatedate(published)
log.info(f'信用代码{social_code}的资讯列表---{storyPath}')
#是否重复判断 #是否重复判断
flag=selectLinkMsg(storyPath,social_code) flag=selectLinkMsg(storyPath,social_code)
if flag: if flag:
...@@ -121,7 +121,10 @@ def paserList(searchmsg,social_code): ...@@ -121,7 +121,10 @@ def paserList(searchmsg,social_code):
# sourceAddress=storyPath # sourceAddress=storyPath
sourceAddress=storyPath sourceAddress=storyPath
content,contentWithTag=extractorMsg(sourceAddress,title) content,contentWithTag=extractorMsg(sourceAddress,title)
if content: if content:
if len(content)<150:
continue
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
detailmsg={ detailmsg={
'content': content, 'content': content,
...@@ -214,7 +217,7 @@ def conn144(): ...@@ -214,7 +217,7 @@ def conn144():
def getStockFromSql(): def getStockFromSql():
conn,cursor=conn144() conn,cursor=conn144()
# 检查记录是否存在 # 检查记录是否存在
select_sql=f"SELECT ticker,exchange,xydm FROM mgzqyjwyh_list " select_sql=f"SELECT ticker,exchange,xydm FROM mgzqyjwyh_list where xydm='ZZSN22080900000001' "
cursor.execute(select_sql) cursor.execute(select_sql)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
conn.commit() conn.commit()
...@@ -313,20 +316,23 @@ if __name__ == '__main__': ...@@ -313,20 +316,23 @@ if __name__ == '__main__':
# url='https://news-headlines.tradingview.com/v2/headlines?client=web&lang=zh-Hans&symbol=NASDAQ%3AAAPL' # url='https://news-headlines.tradingview.com/v2/headlines?client=web&lang=zh-Hans&symbol=NASDAQ%3AAAPL'
# searchmsg=reqmsg(url) # searchmsg=reqmsg(url)
# print(searchmsg) # print(searchmsg)
# getStockFromSql() getStockFromSql()
while True: # while True:
try: # try:
tradview_ticker=r.lpop('tradview_ticker') # tradview_ticker=r.lpop('tradview_ticker')
if tradview_ticker: # if tradview_ticker:
tradviewticker = tradview_ticker.decode(errors='ignore') #
ticker_param=str(tradviewticker).split('_')[0] # tradviewticker = tradview_ticker.decode(errors='ignore')
social_code=str(tradviewticker).split('_')[1] # log.info(f'采集资讯的企业{tradviewticker}')
url=f'https://news-headlines.tradingview.com/v2/headlines?client=web&lang=zh-Hans&symbol={tradview_ticker}' # ticker_param=str(tradviewticker).split('_')[0]
searchmsg=reqmsg(url) # social_code=str(tradviewticker).split('_')[1]
paserList(searchmsg,social_code) # url=f'https://news-headlines.tradingview.com/v2/headlines?client=web&lang=zh-Hans&symbol={ticker_param}'
except Exception as e: # log.info(f'采集资讯企业列表地址{tradview_ticker}')
log.info(f'redis中获取企业信息为空{e}') # searchmsg=reqmsg(url)
break # paserList(searchmsg,social_code)
# except Exception as e:
# log.info(f'redis中获取企业信息为空{e}')
# break
......
...@@ -50,8 +50,8 @@ if __name__=="__main__": ...@@ -50,8 +50,8 @@ if __name__=="__main__":
opt.add_experimental_option("excludeSwitches", ["enable-automation"]) opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging']) opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False) opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'D:/Google/Chrome/Application/chrome.exe' opt.binary_location = r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
chromedriver = r'D:/cmd100/chromedriver.exe' chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver) browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
......
import os
import redis
from flask import Flask, request, send_file, render_template, jsonify
import json
import pymysql
from pyquery import PyQuery as pq
from flask_cors import cross_origin
'''
手动捕获请求的接口数据,实现解析
使用fiddler将链接对应的页面数据信息发送到后台,后台对数据进行解析
通过fiddler获取浏览器访问刷新的header信息和cookie信息
'''
r = redis.Redis(host='127.0.0.1', port='6379', db=0)
app = Flask(__name__)
@app.route('/')
@cross_origin()
def index():
return 'Welcome to the website!'
@app.route('/get_hold', methods=['POST'])
@cross_origin()
def get_news():
data=request.form
@app.route('/task/setCookie', methods=['GET'])
# @cross_origin()
def setCookie():
try:
cookie = request.args.get('cookie')
r.sadd('hgcookie',cookie)
except Exception as e:
print('error')
return 'succes'
@app.route('/task/getCookieSize', methods=['GET'])
@cross_origin()
def getCookieSize():
try:
size=r.scard('hgcookie')
data = {
"code": 200,
"msg": "操作成功",
"data": size
}
except Exception as e:
data={
"code": 200,
"msg": "操作失败",
"data": 0
}
return jsonify(data)
if __name__ == '__main__':
app.run(port=8003)
...@@ -277,8 +277,8 @@ class GoogleSpider(object): ...@@ -277,8 +277,8 @@ class GoogleSpider(object):
self.logger.info("开始抓取首页..." + self.searchkw ) self.logger.info("开始抓取首页..." + self.searchkw )
time.sleep(5) time.sleep(5)
flag, lists = self.parse_page() flag, lists = self.parse_page()
if len(lists): if len(lists)<1:
time.sleep(600) time.sleep(6)
for detail in lists: for detail in lists:
durl=detail['detailUrl'] durl=detail['detailUrl']
is_member = self.r.sismember('pygoogle_'+self.wordsCode, durl) is_member = self.r.sismember('pygoogle_'+self.wordsCode, durl)
...@@ -292,7 +292,7 @@ class GoogleSpider(object): ...@@ -292,7 +292,7 @@ class GoogleSpider(object):
hasnext = hasnext.strip() hasnext = hasnext.strip()
timeFlag = False timeFlag = False
while hasnext == '下一页': while hasnext == '下一页':
if self.page_num==29: if self.page_num==5:
break break
self.page_num = self.page_num + 1 self.page_num = self.page_num + 1
self.logger.info("开始抓取第%s页..." % self.page_num) self.logger.info("开始抓取第%s页..." % self.page_num)
......
...@@ -113,6 +113,8 @@ class GoogleTaskJob(object): ...@@ -113,6 +113,8 @@ class GoogleTaskJob(object):
id=keymsg['id'] id=keymsg['id']
try: try:
searchEngines=keymsg['searchEngines'] searchEngines=keymsg['searchEngines']
if 'java.util.ArrayList' in searchEngines:
searchEngines=searchEngines[1]
except Exception as e: except Exception as e:
searchEngines=[] searchEngines=[]
kwList=[] kwList=[]
...@@ -127,17 +129,6 @@ class GoogleTaskJob(object): ...@@ -127,17 +129,6 @@ class GoogleTaskJob(object):
'sid':id 'sid':id
} }
kwList.append(kwmsg) kwList.append(kwmsg)
else:
logger.info('+++++')
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
return kwList return kwList
def runSpider(self,kwmsg): def runSpider(self,kwmsg):
...@@ -175,8 +166,7 @@ if __name__ == '__main__': ...@@ -175,8 +166,7 @@ if __name__ == '__main__':
try: try:
codeids=[] codeids=[]
# codeid='KW-20230727-0001' # codeid='KW-20230727-0001'
codeids.append('KW-20230814-0001') codeids.append('KW-20230925-0002')
codeids.append('KW-20230814-0005')
for codeid in codeids: for codeid in codeids:
try: try:
# keymsg=baiduTaskJob.getkafka() # keymsg=baiduTaskJob.getkafka()
......
...@@ -215,7 +215,7 @@ class BaseCore: ...@@ -215,7 +215,7 @@ class BaseCore:
except : except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy= self.__cnx_proxy.cursor()
pass pass
...@@ -310,27 +310,28 @@ class BaseCore: ...@@ -310,27 +310,28 @@ class BaseCore:
proxy_list.append(proxy) proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)] return proxy_list[random.randint(0, 3)]
def get_proxy(self): # def get_proxy(self):
ip_list = [] # ip_list = []
with self.__cursor_proxy as cursor: # with self.__cursor_proxy as cursor:
sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12)) # sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12))
print(sql_str) # print(sql_str)
cursor.execute(sql_str) # cursor.execute(sql_str)
rows = cursor.fetchall() # rows = cursor.fetchall()
for row in tqdm(rows): # for row in tqdm(rows):
str_ip = row[0] # str_ip = row[0]
str_ip_list = str_ip.split('-') # str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % { # proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0], # "host": str_ip_list[0],
"port": str_ip_list[1], # "port": str_ip_list[1],
} # }
proxy = { # proxy = {
"HTTP": proxyMeta, # "HTTP": proxyMeta,
"HTTPS": proxyMeta # "HTTPS": proxyMeta
} # }
ip_list.append(proxy) # ip_list.append(proxy)
#
# return ip_list
return ip_list
def get_proxyIPPort(self): def get_proxyIPPort(self):
ip_list = [] ip_list = []
with self.__cursor_proxy as cursor: with self.__cursor_proxy as cursor:
......
import requests
import json
from openpyxl import Workbook
import time
import hashlib
import os
import datetime
#可能不一样
start_url = 'https://www.toutiao.com/api/pc/feed/?min_behot_time=0&category=news_hot&utm_source=toutiao&widen=1&max_behot_time='
url = 'https://www.toutiao.com'
headers={
'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'
}
cookies = {''} # 此处cookies可从浏览器中查找,为了避免被头条禁止爬虫
max_behot_time = '0' # 链接参数
title = [] # 存储新闻标题
source_url = [] # 存储新闻的链接
s_url = [] # 存储新闻的完整链接
source = [] # 存储发布新闻的公众号
media_url = {} # 存储公众号的完整链接
def get_as_cp(): # 该函数主要是为了获取as和cp参数,程序参考今日头条中的加密js文件:home_4abea46.js
zz = {}
now = round(time.time())
print(now) # 获取当前计算机时间
e = hex(int(now)).upper()[2:] #hex()转换一个整数对象为16进制的字符串表示
print('e:', e)
a = hashlib.md5() #hashlib.md5().hexdigest()创建hash对象并返回16进制结果
print('a:', a)
a.update(str(int(now)).encode('utf-8'))
i = a.hexdigest().upper()
print('i:', i)
if len(e)!=8:
zz = {'as':'479BB4B7254C150',
'cp':'7E0AC8874BB0985'}
return zz
n = i[:5]
a = i[-5:]
r = ''
s = ''
for i in range(5):
s= s+n[i]+e[i]
for j in range(5):
r = r+e[j+3]+a[j]
zz ={
'as':'A1'+s+e[-3:],
'cp':e[0:3]+r+'E1'
}
print('zz:', zz)
return zz
def getdata(url, headers, cookies): # 解析网页函数
r = requests.get(url, headers=headers, cookies=cookies)
print(url)
data = json.loads(r.text)
return data
def savedata(title, s_url, source, media_url): # 存储数据到文件
# 存储数据到xlxs文件
wb = Workbook()
if not os.path.isdir(os.getcwd()+'/result'): # 判断文件夹是否存在
os.makedirs(os.getcwd()+'/result') # 新建存储文件夹
filename = os.getcwd()+'/result/result-'+datetime.datetime.now().strftime('%Y-%m-%d-%H-%m')+'.xlsx' # 新建存储结果的excel文件
ws = wb.active
ws.title = 'data' # 更改工作表的标题
ws['A1'] = '标题' # 对表格加入标题
ws['B1'] = '新闻链接'
ws['C1'] = '头条号'
ws['D1'] = '头条号链接'
for row in range(2, len(title)+2): # 将数据写入表格
_= ws.cell(column=1, row=row, value=title[row-2])
_= ws.cell(column=2, row=row, value=s_url[row-2])
_= ws.cell(column=3, row=row, value=source[row-2])
_= ws.cell(column=4, row=row, value=media_url[source[row-2]])
wb.save(filename=filename) # 保存文件
def main(max_behot_time, title, source_url, s_url, source, media_url): # 主函数
for i in range(3): # 此处的数字类似于你刷新新闻的次数,正常情况下刷新一次会出现10条新闻,但夜存在少于10条的情况;所以最后的结果并不一定是10的倍数
##--------------------------------------------
#这一部分就是url的组成部分肯定和今年不一样了,然后获取到的json文件的处理后面基本不难,就是分离出相应的参数
ascp = get_as_cp() # 获取as和cp参数的函数
demo = getdata(start_url+max_behot_time+'&max_behot_time_tmp='+max_behot_time+'&tadrequire=true&as='+ascp['as']+'&cp='+ascp['cp'], headers, cookies)
##------------------------------------------
print(demo)
# time.sleep(1)
for j in range(len(demo['data'])):
# print(demo['data'][j]['title'])
if demo['data'][j]['title'] not in title:
title.append(demo['data'][j]['title']) # 获取新闻标题
source_url.append(demo['data'][j]['source_url']) # 获取新闻链接
source.append(demo['data'][j]['source']) # 获取发布新闻的公众号
if demo['data'][j]['source'] not in media_url:
media_url[demo['data'][j]['source']] = url+demo['data'][j]['media_url'] # 获取公众号链接
print(max_behot_time)
max_behot_time = str(demo['next']['max_behot_time']) # 获取下一个链接的max_behot_time参数的值
for index in range(len(title)):
print('标题:', title[index])
if 'https' not in source_url[index]:
s_url.append(url+source_url[index])
print('新闻链接:', url+source_url[index])
else:
print('新闻链接:', source_url[index])
s_url.append(source_url[index])
# print('源链接:', url+source_url[index])
print('头条号:', source[index])
print(len(title)) # 获取的新闻数量
if __name__ == '__main__':
main(max_behot_time, title, source_url, s_url, source, media_url)
savedata(title, s_url, source, media_url)
...@@ -215,7 +215,7 @@ class BaseCore: ...@@ -215,7 +215,7 @@ class BaseCore:
except : except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy= self.__cnx_proxy.cursor()
pass pass
...@@ -310,27 +310,28 @@ class BaseCore: ...@@ -310,27 +310,28 @@ class BaseCore:
proxy_list.append(proxy) proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)] return proxy_list[random.randint(0, 3)]
def get_proxy(self): # def get_proxy(self):
ip_list = [] # ip_list = []
with self.__cursor_proxy as cursor: # with self.__cursor_proxy as cursor:
sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12)) # sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12))
print(sql_str) # print(sql_str)
cursor.execute(sql_str) # cursor.execute(sql_str)
rows = cursor.fetchall() # rows = cursor.fetchall()
for row in tqdm(rows): # for row in tqdm(rows):
str_ip = row[0] # str_ip = row[0]
str_ip_list = str_ip.split('-') # str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % { # proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0], # "host": str_ip_list[0],
"port": str_ip_list[1], # "port": str_ip_list[1],
} # }
proxy = { # proxy = {
"HTTP": proxyMeta, # "HTTP": proxyMeta,
"HTTPS": proxyMeta # "HTTPS": proxyMeta
} # }
ip_list.append(proxy) # ip_list.append(proxy)
#
# return ip_list
return ip_list
def get_proxyIPPort(self): def get_proxyIPPort(self):
ip_list = [] ip_list = []
with self.__cursor_proxy as cursor: with self.__cursor_proxy as cursor:
......
...@@ -23,6 +23,9 @@ from smart_extractor import SmartExtractor ...@@ -23,6 +23,9 @@ from smart_extractor import SmartExtractor
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from selenium import webdriver from selenium import webdriver
from urllib.parse import quote, unquote from urllib.parse import quote, unquote
from requests.packages import urllib3
import traceback
urllib3.disable_warnings()
class QQnewsSpider(object): class QQnewsSpider(object):
...@@ -46,7 +49,7 @@ class QQnewsSpider(object): ...@@ -46,7 +49,7 @@ class QQnewsSpider(object):
self.wordsCode = wordsCode self.wordsCode = wordsCode
self.sid = sid self.sid = sid
#将列表数据插入到表中 baidu_search_result #将列表数据插入到表中 meta_search_result
def itemInsertToTable(self,items): def itemInsertToTable(self,items):
try: try:
itemdata=[] itemdata=[]
...@@ -56,7 +59,7 @@ class QQnewsSpider(object): ...@@ -56,7 +59,7 @@ class QQnewsSpider(object):
data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime) data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime)
itemdata.append(data) itemdata.append(data)
sql ="INSERT into baidu_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)" sql ="INSERT into meta_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata) cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!") self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句 # 定义插入数据的SQL语句
...@@ -257,6 +260,7 @@ class QQnewsSpider(object): ...@@ -257,6 +260,7 @@ class QQnewsSpider(object):
res.encoding='utf-8' res.encoding='utf-8'
text=res.text text=res.text
except Exception as e: except Exception as e:
traceback.print_exc()
text='' text=''
return text return text
...@@ -279,7 +283,7 @@ class QQnewsSpider(object): ...@@ -279,7 +283,7 @@ class QQnewsSpider(object):
def get_page_html(self): def get_page_html(self):
#设置采集列表页面和页数 #设置采集列表页面和页数
url='https://i.news.qq.com/gw/pc_search/result' url='https://i.news.qq.com/gw/pc_search/result'
totalnum=3 totalnum=5
keyword=self.searchkw keyword=self.searchkw
# keyword='浙江国有资本运营公司' # keyword='浙江国有资本运营公司'
for pagenum in range(0,totalnum): for pagenum in range(0,totalnum):
...@@ -318,7 +322,7 @@ class QQnewsSpider(object): ...@@ -318,7 +322,7 @@ class QQnewsSpider(object):
bdetail=self.getDetailmsg(detailmsg) bdetail=self.getDetailmsg(detailmsg)
processitem=self.getProcessitem(bdetail) processitem=self.getProcessitem(bdetail)
try: try:
self.sendkafka(processitem) # self.sendkafka(processitem)
self.r.sadd('pyqqnews_'+self.wordsCode, processitem['sourceAddress']) self.r.sadd('pyqqnews_'+self.wordsCode, processitem['sourceAddress'])
except Exception as e: except Exception as e:
self.logger.info("放入kafka失败!") self.logger.info("放入kafka失败!")
...@@ -422,8 +426,7 @@ class QQnewsSpider(object): ...@@ -422,8 +426,7 @@ class QQnewsSpider(object):
sm=SmartExtractor(lang) sm=SmartExtractor(lang)
try: try:
raw_html=self.reqHtml(url) raw_html=self.reqHtml(url)
# raw_html=''
if raw_html: if raw_html:
try: try:
soup=BeautifulSoup(raw_html,'html.parser') soup=BeautifulSoup(raw_html,'html.parser')
...@@ -439,8 +442,10 @@ class QQnewsSpider(object): ...@@ -439,8 +442,10 @@ class QQnewsSpider(object):
content=article.cleaned_text content=article.cleaned_text
contentWithTag=article.text contentWithTag=article.text
if content: if content:
self.logger.info("req请求成功!")
return content,contentWithTag return content,contentWithTag
try: try:
self.logger.info("打开模拟浏览器")
raw_html=self.webDriver(url) raw_html=self.webDriver(url)
if raw_html: if raw_html:
try: try:
...@@ -451,6 +456,7 @@ class QQnewsSpider(object): ...@@ -451,6 +456,7 @@ class QQnewsSpider(object):
except Exception as e: except Exception as e:
self.logger.info("定位解析失败!") self.logger.info("定位解析失败!")
if content: if content:
self.logger.info("模拟浏览器抽取成功!")
return content,contentWithTag return content,contentWithTag
sm=SmartExtractor(lang) sm=SmartExtractor(lang)
......
...@@ -168,7 +168,7 @@ class QQnewsTaskJob(object): ...@@ -168,7 +168,7 @@ class QQnewsTaskJob(object):
try: try:
qqnewsSpider.get_page_html() qqnewsSpider.get_page_html()
except Exception as e: except Exception as e:
logger.info('搜狗搜索异常'+searchkw) logger.info('腾讯新闻搜索异常'+searchkw)
if qqnewsSpider.detailList.qsize() != 0: if qqnewsSpider.detailList.qsize() != 0:
try: try:
......
...@@ -3,6 +3,7 @@ from urllib.parse import urljoin ...@@ -3,6 +3,7 @@ from urllib.parse import urljoin
import pymysql import pymysql
import requests import requests
import urllib3
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from gne import GeneralNewsExtractor from gne import GeneralNewsExtractor
from langid import langid from langid import langid
...@@ -24,7 +25,7 @@ from kafka import KafkaProducer ...@@ -24,7 +25,7 @@ from kafka import KafkaProducer
import json import json
from baseCore import BaseCore from baseCore import BaseCore
import configparser import configparser
urllib3.disable_warnings()
from smart_extractor import SmartExtractor from smart_extractor import SmartExtractor
...@@ -54,6 +55,7 @@ class SougouSpider(object): ...@@ -54,6 +55,7 @@ class SougouSpider(object):
self.searchkw = searchkw self.searchkw = searchkw
self.wordsCode = wordsCode self.wordsCode = wordsCode
self.sid = sid self.sid = sid
def createDriver(self): def createDriver(self):
chrome_driver =self.config.get('selenium', 'chrome_driver') chrome_driver =self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver) path = Service(chrome_driver)
...@@ -63,7 +65,7 @@ class SougouSpider(object): ...@@ -63,7 +65,7 @@ class SougouSpider(object):
# proxy = "127.0.0.1:8080" # 代理地址和端口 # proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy) # chrome_options.add_argument('--proxy-server=http://' + proxy)
self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options) self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
#将列表数据插入到表中 baidu_search_result #将列表数据插入到表中 meta_search_result
def itemInsertToTable(self,items): def itemInsertToTable(self,items):
try: try:
itemdata=[] itemdata=[]
...@@ -73,7 +75,7 @@ class SougouSpider(object): ...@@ -73,7 +75,7 @@ class SougouSpider(object):
data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime) data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime)
itemdata.append(data) itemdata.append(data)
sql ="INSERT into baidu_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)" sql ="INSERT into meta_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata) cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!") self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句 # 定义插入数据的SQL语句
...@@ -422,7 +424,7 @@ class SougouSpider(object): ...@@ -422,7 +424,7 @@ class SougouSpider(object):
bdetail=self.getDetailmsg(detailmsg) bdetail=self.getDetailmsg(detailmsg)
processitem=self.getProcessitem(bdetail) processitem=self.getProcessitem(bdetail)
try: try:
self.sendkafka(processitem) # self.sendkafka(processitem)
self.r.sadd('pysougou_'+self.wordsCode, processitem['sourceAddress']) self.r.sadd('pysougou_'+self.wordsCode, processitem['sourceAddress'])
except Exception as e: except Exception as e:
self.logger.info("放入kafka失败!") self.logger.info("放入kafka失败!")
......
...@@ -215,7 +215,7 @@ class BaseCore: ...@@ -215,7 +215,7 @@ class BaseCore:
except : except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy= self.__cnx_proxy.cursor()
pass pass
...@@ -310,27 +310,28 @@ class BaseCore: ...@@ -310,27 +310,28 @@ class BaseCore:
proxy_list.append(proxy) proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)] return proxy_list[random.randint(0, 3)]
def get_proxy(self): # def get_proxy(self):
ip_list = [] # ip_list = []
with self.__cursor_proxy as cursor: # with self.__cursor_proxy as cursor:
sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12)) # sql_str = '''select PROXY from clb_proxy where id={} '''.format(random.randint(1, 12))
print(sql_str) # print(sql_str)
cursor.execute(sql_str) # cursor.execute(sql_str)
rows = cursor.fetchall() # rows = cursor.fetchall()
for row in tqdm(rows): # for row in tqdm(rows):
str_ip = row[0] # str_ip = row[0]
str_ip_list = str_ip.split('-') # str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % { # proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0], # "host": str_ip_list[0],
"port": str_ip_list[1], # "port": str_ip_list[1],
} # }
proxy = { # proxy = {
"HTTP": proxyMeta, # "HTTP": proxyMeta,
"HTTPS": proxyMeta # "HTTPS": proxyMeta
} # }
ip_list.append(proxy) # ip_list.append(proxy)
#
# return ip_list
return ip_list
def get_proxyIPPort(self): def get_proxyIPPort(self):
ip_list = [] ip_list = []
with self.__cursor_proxy as cursor: with self.__cursor_proxy as cursor:
......
...@@ -3,6 +3,7 @@ from urllib.parse import urljoin ...@@ -3,6 +3,7 @@ from urllib.parse import urljoin
import pymysql import pymysql
import requests import requests
import urllib3
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from gne import GeneralNewsExtractor from gne import GeneralNewsExtractor
from langid import langid from langid import langid
...@@ -23,7 +24,7 @@ from smart_extractor import SmartExtractor ...@@ -23,7 +24,7 @@ from smart_extractor import SmartExtractor
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from selenium import webdriver from selenium import webdriver
from urllib.parse import quote, unquote from urllib.parse import quote, unquote
urllib3.disable_warnings()
class SouhunewsSpider(object): class SouhunewsSpider(object):
def __init__(self,searchkw,wordsCode,sid): def __init__(self,searchkw,wordsCode,sid):
...@@ -31,9 +32,9 @@ class SouhunewsSpider(object): ...@@ -31,9 +32,9 @@ class SouhunewsSpider(object):
self.config = configparser.ConfigParser() self.config = configparser.ConfigParser()
# 读取配置文件 # 读取配置文件
self.config.read('config.ini') self.config.read('config.ini')
baseCore=BaseCore() self.baseCore=BaseCore()
self.logger=baseCore.getLogger() self.logger=self.baseCore.getLogger()
self.url = 'https://www.sogou.com/' self.url = 'https://www.sohu.com/'
self.r = redis.Redis(host=self.config.get('redis', 'host'), self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'), port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0) password=self.config.get('redis', 'pass'), db=0)
...@@ -46,7 +47,7 @@ class SouhunewsSpider(object): ...@@ -46,7 +47,7 @@ class SouhunewsSpider(object):
self.wordsCode = wordsCode self.wordsCode = wordsCode
self.sid = sid self.sid = sid
#将列表数据插入到表中 baidu_search_result #将列表数据插入到表中 meta_search_result
def itemInsertToTable(self,items): def itemInsertToTable(self,items):
try: try:
itemdata=[] itemdata=[]
...@@ -56,7 +57,7 @@ class SouhunewsSpider(object): ...@@ -56,7 +57,7 @@ class SouhunewsSpider(object):
data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime) data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime)
itemdata.append(data) itemdata.append(data)
sql ="INSERT into baidu_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)" sql ="INSERT into meta_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata) cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!") self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句 # 定义插入数据的SQL语句
...@@ -241,13 +242,26 @@ class SouhunewsSpider(object): ...@@ -241,13 +242,26 @@ class SouhunewsSpider(object):
'Accept-Language':'zh-CN,zh;q=0.9', 'Accept-Language':'zh-CN,zh;q=0.9',
'Cookie':'SUV=1695794576771hvzq2n; clt=1695794576; cld=20230927140256; t=1695794594569; reqtype=pc; gidinf=x099980109ee17b02bbffd42800081cb2f516277b38e', 'Cookie':'SUV=1695794576771hvzq2n; clt=1695794576; cld=20230927140256; t=1695794594569; reqtype=pc; gidinf=x099980109ee17b02bbffd42800081cb2f516277b38e',
} }
proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'} # proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
proxy =self.baseCore.get_proxy()
try: try:
res=requests.get(url,headers=headers,verify=False,timeout=10) res=requests.get(url,headers=headers,proxies=proxy,verify=False,timeout=10)
res.encoding='utf-8' res.encoding='utf-8'
text=res.text text=res.text
except Exception as e: except Exception as e:
text='' try:
res=requests.get(url,headers=headers,verify=False,timeout=10)
res.encoding='utf-8'
text=res.text
except Exception as e:
text=''
if text=='':
try:
res=requests.get(url,headers=headers,verify=False,timeout=10)
res.encoding='utf-8'
text=res.text
except Exception as e:
text=''
return text return text
def get_realurl(self,tmpurl): def get_realurl(self,tmpurl):
...@@ -272,7 +286,7 @@ class SouhunewsSpider(object): ...@@ -272,7 +286,7 @@ class SouhunewsSpider(object):
# 获取每一页数据, 开趴. # 获取每一页数据, 开趴.
def get_page_html(self): def get_page_html(self):
#设置采集列表页面和页数 #设置采集列表页面和页数
totalnum=3 totalnum=5
keyword=self.searchkw keyword=self.searchkw
# keyword='浙江国有资本运营公司' # keyword='浙江国有资本运营公司'
for pagenum in range(0,totalnum): for pagenum in range(0,totalnum):
...@@ -319,7 +333,7 @@ class SouhunewsSpider(object): ...@@ -319,7 +333,7 @@ class SouhunewsSpider(object):
bdetail=self.getDetailmsg(detailmsg) bdetail=self.getDetailmsg(detailmsg)
processitem=self.getProcessitem(bdetail) processitem=self.getProcessitem(bdetail)
try: try:
self.sendkafka(processitem) # self.sendkafka(processitem)
self.r.sadd('pysouhunews_'+self.wordsCode, processitem['sourceAddress']) self.r.sadd('pysouhunews_'+self.wordsCode, processitem['sourceAddress'])
except Exception as e: except Exception as e:
self.logger.info("放入kafka失败!") self.logger.info("放入kafka失败!")
...@@ -418,8 +432,8 @@ class SouhunewsSpider(object): ...@@ -418,8 +432,8 @@ class SouhunewsSpider(object):
def extractorMsg(self,url,title): def extractorMsg(self,url,title):
content='' content=''
contentWithTag='' contentWithTag=''
lang='' lang='zh'
lang=self.detect_language(title) # lang=self.detect_language(title)
sm=SmartExtractor(lang) sm=SmartExtractor(lang)
try: try:
raw_html=self.reqSouhuHtml(url) raw_html=self.reqSouhuHtml(url)
...@@ -514,7 +528,7 @@ class SouhunewsSpider(object): ...@@ -514,7 +528,7 @@ class SouhunewsSpider(object):
if content!='': if content!='':
processitem={ processitem={
"sid":self.sid, "sid":self.sid,
"source":"5", "source":"3",
"title":bdetail['title'], "title":bdetail['title'],
"content":bdetail['content'], "content":bdetail['content'],
"contentWithtag":bdetail['contentHtml'], "contentWithtag":bdetail['contentHtml'],
......
...@@ -168,7 +168,7 @@ class SouhunewsTaskJob(object): ...@@ -168,7 +168,7 @@ class SouhunewsTaskJob(object):
try: try:
souhunewsSpider.get_page_html() souhunewsSpider.get_page_html()
except Exception as e: except Exception as e:
logger.info('搜搜索异常'+searchkw) logger.info('搜搜索异常'+searchkw)
if souhunewsSpider.detailList.qsize() != 0: if souhunewsSpider.detailList.qsize() != 0:
try: try:
......
from curl_cffi import requests
def test1():
# 注意这个 impersonate 参数,指定了模拟哪个浏览器
r = requests.get("https://tls.browserleaks.com/json", impersonate="chrome101")
print(r.json())
def test2():
proxies={"http": "http://127.0.0.1:1080", "https": "http://127.0.0.1:1080"}
r = requests.get("http://baidu.com",
proxies=proxies,
allow_redirects=False,
impersonate="chrome101"
)
print(r.text)
def test3():
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
}
url = "http://baidu.com"
# 注意这个 impersonate 参数,指定了模拟哪个浏览器
response = requests.get(url, headers=headers, impersonate="chrome101")
print(response.text)
test3()
# connect timeout in seconds
# default value is 30s
connect_timeout=300
# network timeout in seconds
# default value is 30s
network_timeout=600
# the base path to store log files
#base_path=/home/tarena/django-project/cc_shop1/cc_shop1/logs
# tracker_server can ocur more than once, and tracker_server format is
# "host:port", host can be hostname or ip address
tracker_server=114.115.215.96:22122
#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=info
# if use connection pool
# default value is false
# since V4.05
use_connection_pool = false
# connections whose the idle time exceeds this time will be closed
# unit: second
# default value is 3600
# since V4.05
connection_pool_max_idle_time = 3600
# if load FastDFS parameters from tracker server
# since V4.05
# default value is false
load_fdfs_parameters_from_tracker=false
# if use storage ID instead of IP address
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# default value is false
# since V4.05
use_storage_id = false
# specify storage ids filename, can use relative or absolute path
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# since V4.05
storage_ids_filename = storage_ids.conf
#HTTP settings
http.tracker_server_port=80
#use "#include" directive to include HTTP other settiongs
##include http.conf
from loguru import logger
logger.add('logs/z_project.log',
level='DEBUG',
format='{time:YYYY-MM-DD 
:mm:ss} - {level} - {file} - {line} - {message}',
rotation="10 MB")
logger.info('可以写日志了')
\ No newline at end of file
import pdfplumber
import pdfplumber
import re
import os
def re_text(bt, text):
m1 = re.search(bt, text)
if m1 is not None:
return re_block(m1[0])
def re_block(text):
return text.replace(' ', '').replace(' ', '').replace(')', '').replace(')', '').replace(':', ':')
def get_pdf(dir_path):
pdf_file = []
for root, sub_dirs, file_names in os.walk(dir_path):
for name in file_names:
if name.endswith('.pdf'):
filepath = os.path.join(root, name)
pdf_file.append(filepath)
return pdf_file
def read():
filenames = get_pdf(r'C:\Users\WIN10\Desktop\a') # 修改为自己的文件目录
for filename in filenames:
print(filename)
with pdfplumber.open(filename) as pdf:
first_page = pdf.pages[0]
pdf_text = first_page.extract_text()
print(pdf_text)
if '发票' not in pdf_text:
continue
# print(pdf_text)
print('--------------------------------------------------------')
print(re_text(re.compile(r'[\u4e00-\u9fa5]+电子普通发票.*?'), pdf_text))
t2 = re_text(re.compile(r'[\u4e00-\u9fa5]+专用发票.*?'), pdf_text)
if t2:
print(t2)
# print(re_text(re.compile(r'发票代码(.*\d+)'), pdf_text))
print(re_text(re.compile(r'发票号码(.*\d+)'), pdf_text))
print(re_text(re.compile(r'开票日期(.*)'), pdf_text))
print(re_text(re.compile(r'名\s*称\s*[::]\s*([\u4e00-\u9fa5]+)'), pdf_text))
print(re_text(re.compile(r'纳税人识别号\s*[::]\s*([a-zA-Z0-9]+)'), pdf_text))
price = re_text(re.compile(r'小写.*(.*[0-9.]+)'), pdf_text)
print(price)
company = re.findall(re.compile(r'名.*称\s*[::]\s*([\u4e00-\u9fa5]+)'), pdf_text)
if company:
print(re_block(company[len(company)-1]))
print('--------------------------------------------------------')
read()
\ No newline at end of file
...@@ -7,7 +7,7 @@ import openpyxl ...@@ -7,7 +7,7 @@ import openpyxl
from urllib.parse import urlparse from urllib.parse import urlparse
# 打开Excel文件 # 打开Excel文件
workbook = openpyxl.load_workbook('name.xlsx') workbook = openpyxl.load_workbook(r'C:\Users\WIN10\Desktop\aa\qiye.xlsx')
# 获取工作表对象 # 获取工作表对象
worksheet = workbook.active worksheet = workbook.active
...@@ -17,14 +17,14 @@ qiyedatas=[] ...@@ -17,14 +17,14 @@ qiyedatas=[]
for row in worksheet.iter_rows(values_only=True): for row in worksheet.iter_rows(values_only=True):
qiyemsg={ qiyemsg={
'yname':row[0], 'yname':row[0],
'name':row[1], 'name':row[0],
'url':row[2] 'url':row[1]
} }
qiyedatas.append(qiyemsg) qiyedatas.append(qiyemsg)
# 打印每行的数据 # 打印每行的数据
# print(row) # print(row)
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='root', passwd='f7s0&7qqtK', db='clb_project', conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8') charset='utf8')
cursor = conn.cursor() cursor = conn.cursor()
sql1 = """select id, info_source_code, web_site_name, site_name , site_uri from info_source WHERE web_site_name like '%[name]%' """ sql1 = """select id, info_source_code, web_site_name, site_name , site_uri from info_source WHERE web_site_name like '%[name]%' """
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论