提交 e6a673ab 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

[redis]
;host=127.0.0.1
host=192.168.1.234
port=6379
pass=
[mysql]
host=114.115.159.144
username=caiji
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
[selenium]
chrome_driver=D:\chrome\chromedriver.exe
binary_location=D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe
[param]
path='D:\\haiguan\\codets'
year=2023,2022
endMonth=7,6,5,4,3,2,1
import configparser
import csv
import glob
import os
import re
import pandas as pd
def combinFiles(inFileName,outFileName):
# 查找当前目录及其子目录下所有以.txt结尾的文件
# files = glob.glob(r"D:\hg\2023\08\累计\贸易伙伴\202301-08--贸易伙伴--进口-307.csv", recursive=True)
files = glob.glob(inFileName, recursive=True)
# 创建一个空的DataFrame用于存储合并后的数据
merged_data = pd.DataFrame()
# 逐个读取CSV文件并合并到DataFrame中
for file in files:
try:
print('------'+file)
data = pd.read_csv(file, encoding='gbk',dtype=str, keep_default_na=False)
# 尝试读取数据
# df = pd.read_csv('filename.csv', error_bad_lines=False)
except Exception as e2:
print(e2)
# 去掉最后一列
data = data.iloc[:, :-1]
dad=pd.DataFrame(data,dtype=str)
# merged_data = merged_data.append(dad, ignore_index=True)
merged_data =pd.concat([merged_data, dad], ignore_index=True)
# 将合并后的数据保存到新的CSV文件中
merged_data.to_csv(outFileName, encoding='gbk', index=False, quoting=1, quotechar='"', escapechar='\\')
print(f'文件名称:{outFileName}')
print('合并完成!!')
return outFileName
def fileclearn(csvFile,outxlsFile,recordName,iEType_name):
# file=r'D:\hg\2023\202307\202307--收发货地址商品--进口.csv'
df = pd.read_csv(csvFile, encoding='gbk',dtype=str)
# df = pd.read_csv(csvFile, encoding='gbk',dtype=str)
# print(data.iloc[1:5])
# 去掉最后一列
# df = data.drop(data.columns[-1], axis=1)
# 将商品编码的列转换为字符串类型
df['商品编码'] = df['商品编码'].astype(str)
# 在数字长度小于8的前面补0
df['商品编码'] = df['商品编码'].str.zfill(8)
df['商品名称'] =df['商品名称'].str.replace('\r', '')
# 添加新列并放在最前面
df.insert(0, '进出口标识', iEType_name)
df.insert(0, '报告期', recordName)
# 找到美元列的索引
usd_column_index = df.columns.get_loc('美元')
# 删除美元列及其后面的列
df = df.iloc[:, :usd_column_index+1]
# 去除逗号并将文本数据转换为数值型
df['美元'] =df['美元'].str.replace(',', '').astype(float)
df['第一数量'] = pd.to_numeric(df['第一数量'].str.replace(',', '').replace('-', ''), errors='coerce')
# 将NaN值替换为0
df['第一数量'] = df['第一数量'].fillna(0)
df['第二数量'] =pd.to_numeric(df['第二数量'].str.replace(',', '').replace('-', ''), errors='coerce')
df['第二数量'] = df['第二数量'].fillna(0)
# 将处理后的DataFrame保存为xlsx文件
#df.to_excel(outxlsFile, encoding='gbk', index=False, engine='openpyxl')
df.to_excel(outxlsFile, index=False, engine='openpyxl')
print('数据处理完成!')
return outxlsFile
"""
文件合并
文件路径
D:\hg\2023\07\单月\收发货地址\*--进口.csv
D:\hg\2023\07\累计\收发货地址\
参数 year endmonth 字段类型 进出口类型
"""
def getFileName(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-*.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-*.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
filename = os.path.join(filepath, filename)
return filename
def getRecordFileName(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str
return filename
def getOutFileName(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'商品--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'商品--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\'
filename = os.path.join(filepath, filename)
return filename
def getOutFileNameXls(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'商品--'+iEType_name+'.xlsx'
filepath=path+str(year)+'\\'+end_str+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'商品--'+iEType_name+'.xlsx'
filepath=path+str(year)+'\\'+end_str+'\\'
filename = os.path.join(filepath, filename)
return filename
#读取配置并调用方法处理文件
def readConfig():
config = configparser.ConfigParser()
config.read('config.ini')
years=config.get('param', 'year')
endMonths=config.get('param', 'endMonth')
field_names=['贸易伙伴']
# field_names=['收发货地址','贸易方式']
iEType_names=['进口','进出口','出口']
for yy in years.split(','):
year=int(yy)
for emm in endMonths.split(','):
endMonth=int(emm)
startMonths=[1]
if endMonth>1:
startMonths.append(endMonth)
for smm in startMonths:
startMonth=int(smm)
for field_name in field_names:
for iEType_name in iEType_names:
infileName=getFileName(year,startMonth,endMonth,field_name,iEType_name)
outFileName=getOutFileName(year,startMonth,endMonth,field_name,iEType_name)
outfileNameXls=getOutFileNameXls(year,startMonth,endMonth,field_name,iEType_name)
print(infileName)
print(outFileName)
# 合并文件
outfileNamecsv=combinFiles(infileName,outFileName)
#转换清洗保存成excel
recordName=getRecordFileName(year,startMonth,endMonth,field_name,iEType_name)
outfileNameXls=fileclearn(outfileNamecsv,outfileNameXls,recordName,iEType_name)
print(outfileNameXls)
if __name__ == '__main__':
readConfig()
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
# 打开输入文件和输出文件
# with open(tmpfilename, 'r') as input_csv, open(filePathName, 'w', newline='') as output_csv:
# # 创建CSV读取器和写入器
# csv_reader = csv.reader(input_csv)
# csv_writer = csv.writer(output_csv)
# # 逐行读取输入文件,并将每一行写入输出文件
# for row in csv_reader:
# csv_writer.writerow(row)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
if endMonth==1:
startMonths=[1]
else:
startMonths=[1,endMonth]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
while len(codes)!= len(codeFileList):
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
def combinFile(self,refile,outFile):
# 查找当前目录及其子目录下所有以.txt结尾的文件
files = glob.glob(r"D:\hg\2023\07\单月\贸易伙伴\202307--贸易伙伴--出口-*.csv", recursive=True)
files = glob.glob(refile, recursive=True)
# print(files)
filess=[]
for ii in files:
file = pd.read_csv(ii, encoding='gbk')
filess.append(file)
combined = pd.concat(filess)
# 保存合并后的文件
# combined.to_csv(r'D:\hg\2023\07\单月\202307--贸易伙伴商品--出口.csv', index=False, encoding='gbk')
combined.to_csv(outFile, index=False, encoding='gbk')
def combinFiles(self,refile,outFile):
# 查找当前目录及其子目录下所有以.txt结尾的文件
files = glob.glob(r"D:\hg\2023\07\单月\贸易伙伴\202307--贸易伙伴--出口-*.csv", recursive=True)
files = glob.glob(refile, recursive=True)
# 创建一个空的DataFrame用于存储合并后的数据
merged_data = pd.DataFrame()
# 逐个读取CSV文件并合并到DataFrame中
for file in files:
data = pd.read_csv(file, encoding='gbk',dtype=str)
# 去掉最后一列
data = data.iloc[:, :-1]
dad=pd.DataFrame(data,dtype=str)
merged_data = merged_data.append(dad, ignore_index=True)
# 将合并后的数据保存到新的CSV文件中
merged_data.to_csv(outFile, encoding='gbk', index=False, quoting=1, quotechar='"', escapechar='\\')
print('合并完成!!')
def fileNameleiji(self):
# refile=r'D:\hg\2023\07\累计\贸易方式\202301-07--贸易方式--进口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--贸易方式商品--进口.csv'
# refile=r'D:\hg\2023\07\累计\贸易方式\202301-07--贸易方式--出口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--贸易方式商品--出口.csv'
# refile=r'D:\hg\2023\07\累计\贸易方式\202301-07--贸易方式--进出口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--贸易方式商品--进出口.csv'
# refile=r'D:\hg\2023\07\累计\收发货地址\202301-07--收发货地址--进口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--收发货地址商品--进口.csv'
# refile=r'D:\hg\2023\07\累计\收发货地址\202301-07--收发货地址--出口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--收发货地址商品--出口.csv'
# refile=r'D:\hg\2023\07\累计\收发货地址\202301-07--收发货地址--进出口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--收发货地址商品--进出口.csv'
# refile=r'D:\hg\2023\07\累计\贸易伙伴\202301-07--贸易伙伴--进口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--贸易伙伴商品--进口.csv'
# refile=r'D:\hg\2023\07\累计\贸易伙伴\202301-07--贸易伙伴--出口-*.csv'
# outFile=r'D:\hg\2023\07\202301-07--贸易伙伴商品--出口.csv'
refile=r'D:\hg\2023\07\累计\贸易伙伴\202301-07--贸易伙伴--进出口-*.csv'
outFile=r'D:\hg\2023\07\202301-07--贸易伙伴商品--进出口.csv'
self.combinFile(refile,outFile)
def fileNamedanyue(self):
# refile=r'D:\hg\2023\07\单月\贸易方式\202307--贸易方式--进口-*.csv'
# outFile=r'D:\hg\2023\07\202307--贸易方式商品--进口.csv'
# refile=r'D:\hg\2023\07\单月\贸易方式\202307--贸易方式--出口-*.csv'
# outFile=r'D:\hg\2023\07\202307--贸易方式商品--出口.csv'
# refile=r'D:\hg\2023\07\单月\贸易方式\202307--贸易方式--进出口-*.csv'
# outFile=r'D:\hg\2023\07\202307--贸易方式商品--进出口.csv'
# refile=r'D:\hg\2023\07\单月\收发货地址\202307--收发货地址--进口-*.csv'
# outFile=r'D:\hg\2023\07\202307--收发货地址商品--出口.csv'
# refile=r'D:\hg\2023\07\单月\收发货地址\202307--收发货地址--出口-*.csv'
# outFile=r'D:\hg\2023\07\202307--收发货地址商品--进口.csv'
# refile=r'D:\hg\2023\07\单月\收发货地址\202307--收发货地址- -进出口-*.csv'
# outFile=r'D:\hg\2023\07\202307--收发货地址商品--进出口.csv'
# refile=r'D:\hg\2023\07\单月\贸易伙伴\202307--贸易伙伴--进口-*.csv'
# outFile=r'D:\hg\2023\07\202307--贸易伙伴商品--进口.csv'
# refile=r'D:\hg\2023\07\单月\贸易伙伴\202307--贸易伙伴--出口-*.csv'
# outFile=r'D:\hg\2023\07\202307--贸易伙伴商品--出口.csv'
refile=r'D:\hg\2023\07\单月\贸易伙伴\202307--贸易伙伴--进出口-*.csv'
outFile=r'D:\hg\2023\07\202307--贸易伙伴商品--进出口.csv'
self.combinFile(refile,outFile)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
for i in range(1,3):
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
from logRecord import LogRecord
'''
海关商品详情下载流程
1.下载商品编码
2.对商品编码进行分组
3.对商品编码进行重命名
4.拼接路径和创建文件名
5.进行数据的下载
6.去重文件中的 \r换行
7.进行文件的保存
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
log=LogRecord()
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
data=data
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '').replace('\r','')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path=self.config.get('param', 'path')
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path=self.config.get('param', 'path')
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+"年--"+start_str+"月-"+end_str+"月--"+iEType_name+"--商品-贸易伙伴-贸易方式-注册地--"+str(code)+".csv";
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+"年--"+end_str+"月--"+iEType_name+"--商品-贸易伙伴-贸易方式-注册地--"+str(code)+".csv";
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepathMsg(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path=self.config.get('param', 'path')
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
fn=str(year)+"年--"+start_str+"月-"+end_str+"月--"+iEType_name+"_文件统计.csv";
filepath=path+str(year)+'\\'+end_str+'\\'
else:
end_str = "{:02d}".format(endMonth)
fn=str(year)+"年--"+end_str+"月--"+iEType_name+"_文件统计.csv";
filepath=path+str(year)+'\\'+end_str+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filemsg = os.path.join(save_dir, fn)
return filemsg
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201', #2022年1月数据需要单独处理
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': 'CODE_TS',
'outerField2':'ORIGIN_COUNTRY',
'outerField3':'TRADE_MODE',
'outerField4':'TRADE_CO_PORT',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9990:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(str(row[0]))
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
if endMonth==1:
startMonths=[1]
else:
startMonths=[1,endMonth]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载商品编码的内容信息
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
#对数据进行变量分组
codeList=self.group_elements(codes)
for k in range(0,len(codeList)):
code=codeList[k]
filecodes='cc'+str(k)
#拼接参数
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
#生成参数对应的文件路径
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,filecodes)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileMsg={
'文件名':filePathName,
'商品编码':code,
}
codeFileList.append(codeFileMsg)
continue
tmpfilename=self.reqDownFile(param)
#将下载的临时文件复制到规定的文件中
saveFileName=self.tmpToFile(tmpfilename,filePathName)
#文件行数超过接近1万时需要对编码进行拆分进行重新下载
if saveFileName=='':
cds=code.split(',')
for j in range(0,len(cds)):
code=cds[j]
filecodes='cc'+str(k)+'_'+str(j)
#拼接参数
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
#生成参数对应的文件路径
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,filecodes)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileMsg={
'文件名':filePathName,
'商品编码':code,
}
codeFileList.append(codeFileMsg)
continue
tmpfilename=self.reqDownFile(param)
#将下载的临时文件复制到规定的文件中
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
filemsg=self.codeFilepathMsg(iEType,currencyType,year,startMonth,endMonth,outerField1)
return codeFileList,filemsg
#详情商品信息参数拼接
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
codeFileList=[]
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
outerField1=['CODE_TS']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile) #获取商品编码
# 进行下载
codeFileList,filemsg=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
continue
return codeFileList,filemsg
def group_elements(self,codes):
groups = [codes[i:i+8] for i in range(0, len(codes), 8)]
result = [','.join(group) for group in groups]
return result
def outfilemsg(self,codeFileList,filemsg):
# 输出字典数据到CSV文件
with open(filemsg, 'w', newline='') as file:
fieldnames = ['文件名', '商品编码']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(codeFileList)
if __name__ == '__main__':
hgDownFile=HgDownFile()
yss=hgDownFile.config.get('param', 'year')
ss=hgDownFile.config.get('param', 'endMonth')
# newMonth = hgDownFile.r.get("newMonth").decode('utf-8')
# ms = "{:02d}".format(int(ss))
#设置采集中
# hgDownFile.r.set("newhgdatastatus"+yss+"_"+ms,1)
try:
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
print(f'year:{year} end:{endMonth}')
try:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 3:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 3:
break
else:
while len(fieldFileList)< 6:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 6:
break
for i in range(1,3):
codeFileList,filemsg=hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
#输出文件和商品编码对应到文件中
hgDownFile.outfilemsg(codeFileList,filemsg)
except Exception as ee:
print(ee)
except Exception as e:
print(e)
# hgDownFile.r.set("newhgdatastatus"+yss+"_"+ms,3)
#设置采集结束
# hgDownFile.r.set("newhgdatastatus"+yss+"_"+ms,2)
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import configparser
import redis
import concurrent.futures
from pyquery import PyQuery as pq
class hgCookie(object):
def __init__(self):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
self.bin_path=self.config.get('selenium', 'binary_location')
self.driver_path=self.config.get('selenium', 'chrome_driver')
def get_webdriver(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument('--headless')
chrome_options.binary_location = self.bin_path
executable_path =self.driver_path
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
return driver
def reqGetCookie(self):
url='http://stats.customs.gov.cn/queryData/queryDataByWhere'
driver=self.get_webdriver()
driver.get(url)
# 等待页面加载完成
wait = WebDriverWait(driver, 10) # 设置最长等待时间为10秒
wait.until(EC.presence_of_element_located((By.ID, "doSearch"))) # 等待元素出现在页面中
# 获取页面加载的所有cookie信息
cookies = driver.get_cookies()
# 构建cookie字符串
cookie_str = '; '.join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
# 打印cookie字符串
self.r.sadd('hgcookie',cookie_str)
driver.quit()
def runSpider(self,i):
print(i)
self.reqGetCookie()
def getnewMonth(self):
url='http://stats.customs.gov.cn/queryData/queryDataByWhere'
driver=self.get_webdriver()
driver.get(url)
# 等待页面加载完成
wait = WebDriverWait(driver, 10) # 设置最长等待时间为10秒
wait.until(EC.presence_of_element_located((By.ID, "doSearch"))) # 等待元素出现在页面中
html=driver.page_source
doc=pq(html)
endMonth=doc('select[id="endMonth"]>option[selected="selected"]').text()
print(f'海关页面的月份{endMonth}')
self.r.set('newMonth',endMonth)
driver.quit()
if __name__ == '__main__':
hgCookie=hgCookie()
hgCookie.getnewMonth()
while True:
size=hgCookie.r.scard('hgcookie')
print(f'海关的cookie数量:{size}')
if size>100:
time.sleep(60)
kwList=[]
for i in range(1, 101):
kwList.append(i)
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(hgCookie.runSpider, data) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
result = future.result()
# 处理任务的执行结果
print(f"任务执行结束: {result}")
except Exception as e:
# 处理任务执行过程中的异常
print(f"任务执行exception: {e}")
\ No newline at end of file
import os
import redis
from flask import Flask, request, send_file, render_template, jsonify
import json
import pymysql
from pyquery import PyQuery as pq
from flask_cors import cross_origin
'''
手动捕获请求的接口数据,实现解析
使用fiddler将链接对应的页面数据信息发送到后台,后台对数据进行解析
'''
r = redis.Redis(host='127.0.0.1', port='6379', db=0)
def connMysql():
# 创建MySQL连接
conx = pymysql.connect(host='114.115.159.144',
user='caiji',
password='zzsn9988',
database='caiji')
# 创建一个游标对象
cursorM = conx.cursor()
return conx,cursorM
def closeSql(conx,cursorM):
# 关闭游标和连接
cursorM.close()
conx.close()
#将列表数据插入到表中 baidu_search_result
def itemInsertToTable(item):
conx,cursorM=connMysql()
zKeyNo=item['zKeyNo']
yKeyNo=item['yKeyNo']
try:
select_sql=f'select * from qccholdmsg where yKeyNo="{yKeyNo}" and zKeyNo="{zKeyNo}" '
cursorM.execute(select_sql)
existing_record = cursorM.fetchone()
except Exception as e:
existing_record=''
if existing_record:
print(f'数据已存在!{zKeyNo}')
return
insert_param=(item['yKeyNo'],item['yCompanyName'],item['nameCount'],item['zKeyNo'],item['zName'],
item['registCapi'],item['province'],item['industry'],item['shortStatus'],item['percentTotal'],item['startDateStr'],
item['h5Url'],item['district'],item['industryDesc'],item['area'],item['industryItem'])
insert_sql ="INSERT into qccholdmsg (yKeyNo,yCompanyName,nameCount,zKeyNo,zName,registCapi,province," \
"industry,shortStatus,percentTotal,startDateStr,h5Url,district,industryDesc,area,industryItem) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.execute(insert_sql,insert_param)
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
print('数据插入成功!')
closeSql(conx,cursorM)
app = Flask(__name__)
@app.route('/')
@cross_origin()
def index():
return 'Welcome to the website!'
@app.route('/get_hold', methods=['POST'])
@cross_origin()
def get_news():
data=request.form
@app.route('/task/setCookie', methods=['GET'])
# @cross_origin()
def setCookie():
try:
cookie = request.args.get('cookie')
r.sadd('hgcookie',cookie)
except Exception as e:
print('error')
return 'succes'
@app.route('/task/getCookieSize', methods=['GET'])
@cross_origin()
def getCookieSize():
try:
size=r.scard('hgcookie')
data = {
"code": 200,
"msg": "操作成功",
"data": size
}
except Exception as e:
data={
"code": 200,
"msg": "操作失败",
"data": 0
}
return jsonify(data)
@app.route('/task/getHtml', methods=['POST'])
# @cross_origin()
def getnewMonth():
try:
html = request.form.get('html')
doc=pq(html)
endMonth=doc('select[id="endMonth"]>option[selected="selected"]').text()
print(f'海关页面的月份{endMonth}')
r.set('newMonth',endMonth)
except Exception as e:
print('error')
return 'success'
if __name__ == '__main__':
app.run(port=8002)
import os
import sys
import logbook
import logbook.more
class LogRecord(object):
# 日志格式
def logFormate(self,record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
filename=os.path.split(record.filename)[-1], # 文件名
func_name=record.func_name, # 函数名
lineno=record.lineno, # 行号
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(self,fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
if not os.path.exists(dirname):
os.mkdir(dirname)
logbook.set_datetime_format('local')
logger = logbook.Logger(filename)
logger.handlers = []
if fileLogFlag: # 日志输出到文件
logFile = logbook.TimedRotatingFileHandler(os.path.join(dirname, filename), date_format='%Y-%m-%d',
bubble=True, encoding='utf-8')
logFile.formatter = self.logFormate
logger.handlers.append(logFile)
if stdOutFlag: # 日志打印到屏幕
logStd = logbook.more.ColorizedStderrHandler(bubble=True)
logStd.formatter = self.logFormate
logger.handlers.append(logStd)
return logger
\ No newline at end of file
[param]
#页面数据大小
pageSize: 10
#进出口类型 1 进口 0 出口 10 进出口
iEType: 1,0,10
#币种 rmb usd
currencyType: usd
#年份
year: 2023
#开始月份
startMonth: 7
#结束月份
endMonth: 7
#
monthFlag:
unitFlag: true
unitFlag1: true
#编码长度
codeLength: 8
#商品编码参数
outerField1: CODE_TS
outerField2:
outerField3:
outerField4:
#商品编码的code
outerValue1:
outerValue2:
outerValue3:
outerValue4:
orderType: CODE ASC DEFAULT
# 数据格式 目前202201年之后的数据下载时设置为:1,2022年之前的数据设置为2 数据格式分界点 202111 之前是2
selectTableState: 1
currentStartTime: 202201
This source diff could not be displayed because it is too large. You can view the blob instead.
import csv
import pandas as pd
# filePath=r'D:\hg\2023\08\累计\贸易伙伴\202301-08--贸易伙伴--进口-307.csv'
# # codes=[]
# with open(filePath) as csvfile:
#
# reader = csv.reader(csvfile)
# #跳过第一条数据
# # next(reader)
# try:
# for row in reader:
# # 进行数据处理
# print(row)
# except csv.Error as e:
# # 打印错误信息
# print(f'Error reading CSV file: {e}')
#
# filePathName='test.csv'
# # 打开输入文件和输出文件
# with open(filePath, 'r') as input_csv, open(filePathName, 'w', newline='') as output_csv:
# # 创建CSV读取器和写入器
# csv_reader = csv.reader(input_csv)
# csv_writer = csv.writer(output_csv)
# # 逐行读取输入文件,并将每一行写入输出文件
# for row in csv_reader:
# csv_writer.writerow(row)
def group_elements(codes):
groups = [codes[i:i+8] for i in range(0, len(codes), 8)]
result = [','.join(group) for group in groups]
return result
cc=['12','13','12','15','1','2','3','4','6','8','9']
aa=group_elements(codes=cc)
print(aa)
1.海关采集说明
海关的信息采集主要是下载对象的数据信息
难点是如何获取到cookie并保存,因为海关网站的cookie只能使用一次后就会失效
海关数据的下载,海关的网站每次只能下载1W条信息,因此数据的下载需要根据条件对code参数进行拆分
2.数据的保存,由于数据中会存在换行,格式不对等问题
1.cookie保存方法
模拟浏览器请求海关网站首页获取到cookie 并将cookie保存到redis中,使用一条删除一条
2.下载接口
下载参数
outerField1: CODE_TS #商品编码
outerField2: ORIGIN_COUNTRY #贸易伙伴
outerField3: TRADE_MODE #贸易方式
outerField4: TRADE_CO_PORT #收发货地址
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论