提交 8651416b 作者: 刘伟刚

新浪财经-纽交所财务数据脚本

上级 beaf2f01
import configparser
import configparser
import re
import time
from urllib.parse import quote, unquote
import pymysql
import redis
import requests
import json
from pyquery import PyQuery as pq
from bs4 import BeautifulSoup
import difflib
import urllib3
from seleniumwire import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from base.BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from operator import itemgetter
from itertools import groupby
import datetime
class SinaUsstock(object):
def __init__(self):
baseCore=BaseCore()
self.logger=baseCore.getLogger()
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=6)
self.driver=self.get_webdriver()
def conn11(self):
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
def deal_table(self,doc_resp):
soup=BeautifulSoup(doc_resp,'html.parser')
tdoc=soup.select('div[class="tbl_wrap"]>table[class="data_tbl os_tbl"]')[1]
tbody=pq(str(tdoc))
uint=tbody.find('tbody>tr:nth-child(1)>th').text().split(':')[1]
pdate=tbody.find('tbody>tr:nth-child(1)>td').text().replace('至','').split(' ')
btds=tbody.find('tbody>tr:gt(0)')
seriesList=[]
for btd in btds:
tddoc=pq(btd)
seriesName=tddoc.find('th').text().replace('+','').replace('-','')
seriesValue=tddoc.find('td').text().split(' ')
for i in range(0,len(pdate)):
value=seriesValue[i]
if '亿' in value:
value = value.replace("亿", "*100000000")
value = eval(value)
elif '万' in value:
value = value.replace("万", "*10000")
value = eval(value)
vvla=str(value)
serisemsg={
'name':seriesName,
'value':vvla,
'ddte':pdate[i],
'uint':uint,
}
seriesList.append(serisemsg)
return seriesList
def getCodeFromRedis(self):
securitiescode=self.r.lpop('sina_usstock:securities_code')
securitiescode = securitiescode.decode('utf-8')
return securitiescode
def get_webdriver(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument('--headless')
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
executable_path =self.config.get('selenium', 'chrome_driver')
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
return driver
# 使用股票代码拼接地址访问新浪财经地址,通过定位现金浏览,资产负债,利润信息
#若执行中出现异常则将股票代码放回redis中,
def get_content2(self,securitiescode):
self.logger.info(f"需要采集的股票代码{securitiescode}")
conn,cursor=self.conn11()
try:
sql1 = f"select social_credit_code,securities_code,securities_short_name from sys_base_enterprise_ipo where securities_code='{securitiescode}' " # and stock_code = "SYNH"
cursor.execute(sql1)
result_data = cursor.fetchall()
except Exception as e:
self.logger.info("数据查询异常!")
return
for data in result_data:
try:
data_list = list(data)
social_credit_code = data_list[0]
stock = data_list[1]
stock2=str(stock)
url=f'http://quotes.sina.com.cn/usstock/hq/balance.php?s={stock2}'
self.driver.get(url)
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
#1.现金 cash 年度 季度
#点击现金标签加载页面 点击年度 解析数据 现金流量表
try:
self.driver.find_element(By.XPATH,'//div[@class="title"]//a[text()="现金流量表"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="年度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList1=self.deal_table(doc_resp1)
zbl1=self.groupdata(seriesList1,'cash')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击现金标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="季度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList2=self.deal_table(doc_resp1)
zbl2=self.groupdata(seriesList2,'cash')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#2.资产 debt 年度 季度
#点击资产标签加载页面 点击年度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="title"]//a[text()="资产负债表"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="年度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList3=self.deal_table(doc_resp1)
zbl3=self.groupdata(seriesList3,'debt')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击现金标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="季度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList4=self.deal_table(doc_resp1)
zbl4=self.groupdata(seriesList4,'debt')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击利资产标签加载页面 点击季度 解析数据
#3.利润 profit 年度 季度
#点击利润标签加载页面 点击年度 解析数据
#点击利润标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="title"]//a[text()="利润表"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="年度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList5=self.deal_table(doc_resp1)
zbl5=self.groupdata(seriesList5,'profit')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击现金标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="季度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList6=self.deal_table(doc_resp1)
zbl6=self.groupdata(seriesList6,'profit')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#转换数据格式发送接口
annualzb=zbl1+zbl3+zbl5
annualzb=self.groupZbData(annualzb,stock,social_credit_code,'annual')
self.sendToFinance(annualzb)
quarterzb=zbl2+zbl4+zbl6
quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter')
self.sendToFinance(quarterzb)
self.logger.info(f'++++++++++股票:{stock}采集结束')
except Exception as e:
self.driver.quit()
time.sleep(3)
self.driver=self.get_webdriver()
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
def sendToFinance(self,zbmsg):
for zbb in zbmsg:
if len(zbb) != 0:
# 调凯歌接口存储数据
data = json.dumps(zbb)
#暂无接口
url_baocun = ''
# url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
self.logger.info(res_baocun.text)
break
except:
time.sleep(1)
#zbList,stock,social_credit_code
def groupZbData(self,zbList,stock,social_credit_code,dateFlag):
self.logger.info('数据根据日期进行组合')
# 根据 date对数据分组
# 根据时间属性对列表内容进行分类
zbList.sort(key=itemgetter('date')) # 先按照age属性进行排序
zbgroups = groupby(zbList, key=itemgetter('date')) # 根据age属性进行分组
# 遍历每个分组,并打印分类结果
zbList=[]
for date, group in zbgroups:
result={}
for item in group:
for key, value in item.items():
if key == "date":
continue
if key not in result:
result[key] = []
result[key].extend(value)
result["date"] = date
result["securitiesCode"] = stock
result["socialCreditCode"] = social_credit_code
result["dateFlag"] = dateFlag
result["ynFirst"] = False
# "securitiesCode": "2342",
# "socialCreditCode": "12314",
# "date": "2023-06-31",
# "dateFlag": "quarter",
# "ynFirst": false
zbList.append(result)
return zbList
#表数据和对应财务指标
def groupdata(self,ssMsg,zbtype):
self.logger.info('对数据进行指标数据进分组')
# 根据时间属性对列表内容进行分类
ssMsg.sort(key=itemgetter('ddte')) # 先按照age属性进行排序
groups = groupby(ssMsg, key=itemgetter('ddte')) # 根据age属性进行分组
# 遍历每个分组,并打印分类结果
zbList=[]
for ddte, group in groups:
# print(f"ddte: {ddte}")
ssList=[]
for item in group:
ii={
"enName": "",
"name": item['name'],
"value": item['value'],
"uint": item['uint']
}
ssList.append(ii)
ddrit={
"date":ddte,
zbtype:ssList
}
zbList.append(ddrit)
return zbList
def getFormatedate(self,timestamp):
date = datetime.datetime.fromtimestamp(timestamp)
formatted_date = date.strftime('%Y-%m-%d')
return formatted_date
if __name__ == '__main__':
sinaUsstock=SinaUsstock()
# securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code')
securitiescode= sinaUsstock.getCodeFromRedis()
securitiescode='AAPL'
try:
sinaUsstock.get_content2(securitiescode)
except Exception as e:
sinaUsstock.r.rpush('sina_usstock:securities_code',securitiescode)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
if endMonth==1:
startMonths=[1]
else:
startMonths=[1,endMonth]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
print(f'codes:{len(codeFileList)}')
print(len(codeFileList))
while len(codes)!= len(codeFileList):
print('+++++++++++++')
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
else:
while len(fieldFileList)< 24:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 24:
break
for i in range(1,3):
print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year==2022 and endMonth==1:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year==2022 and endMonth==1:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
startMonths=[endMonth]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
print(f'codes:{len(codeFileList)}')
print(len(codeFileList))
while len(codes)!= len(codeFileList):
print('+++++++++++++')
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
for i in range(1,3):
print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
# if year>2021:
# selectTableState= 1 #202201前的数据为2 后的数据是1
# else:
# selectTableState= 2 #202201前的数据为2 后的数据是1
selectTableState= 3
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
# if year>2021:
# selectTableState= 1 #202201前的数据为2 后的数据是1
# else:
# selectTableState= 2 #202201前的数据为2 后的数据是1
selectTableState= 3
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
startMonths=[1]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
print(f'codes:{len(codeFileList)}')
print(len(codeFileList))
while len(codes)!= len(codeFileList):
print('+++++++++++++')
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
for i in range(1,3):
print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
......@@ -17,9 +17,11 @@ outerField3: TRADE_MODE #贸易方式
outerField4: TRADE_CO_PORT #收发货地址
海关网站的数据分类
1、近期数据库 2022年1月之后的数据
2、老数据库 2022年1月之前的数据
3、跨库的数据 2022年的累计数据
......
import json
from operator import itemgetter
from itertools import groupby
files=r'D:\hg\3.json'
# 读取JSON文件
with open(files, 'r') as file:
data = json.load(file)
data=data['facts']['us-gaap']
zb=[]
# 遍历字典的key值
for key in data.keys():
accounts=data[key]['units']
for kk in accounts.keys():
accounts=accounts[kk]
for account in accounts:
end=str(account['end'])
val=str(account['val'])
fp=str(account['fp'])
form=str(account['form'])
zhibiao={
'zbname':key,
'riqi':end,
'jine':val,
'fp':fp,
'form':form,
}
zb.append(zhibiao)
#找10-K的数据
# 根据age属性对列表内容进行分类
zb.sort(key=itemgetter('form')) # 先按照age属性进行排序
forms = groupby(zb, key=itemgetter('form')) # 根据age属性进行分组
form_25_data = []
for form, zz in forms:
if '10-K' in form :
form_25_data.extend(list(zz))
# 根据age属性对列表内容进行分类
form_25_data.sort(key=itemgetter('riqi')) # 先按照age属性进行排序
groups = groupby(form_25_data, key=itemgetter('riqi')) # 根据age属性进行分组
# 遍历每个分组,并打印分类结果
for riqi, group in groups:
print(f"riqi: {riqi}")
for item in group:
print(item)
print()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论