提交 1d6ffed3 作者: 薛凌堃

年报topic调整

上级 41161484
...@@ -36,7 +36,7 @@ def sendKafka(dic_news): ...@@ -36,7 +36,7 @@ def sendKafka(dic_news):
start_time = time.time() start_time = time.time()
try: # 114.116.116.241 try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
......
...@@ -41,7 +41,7 @@ def sendKafka(dic_news): ...@@ -41,7 +41,7 @@ def sendKafka(dic_news):
start_time = time.time() start_time = time.time()
try: # 114.116.116.241 try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
......
...@@ -36,7 +36,7 @@ def sendKafka(dic_news): ...@@ -36,7 +36,7 @@ def sendKafka(dic_news):
start_time = time.time() start_time = time.time()
try: # 114.116.116.241 try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
......
...@@ -36,7 +36,7 @@ def sendKafka(dic_news): ...@@ -36,7 +36,7 @@ def sendKafka(dic_news):
start_time = time.time() start_time = time.time()
try: # 114.116.116.241 try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("policy", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
......
...@@ -192,7 +192,7 @@ def spider(com_name,cik,up_okCount): ...@@ -192,7 +192,7 @@ def spider(com_name,cik,up_okCount):
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],compression_type='gzip',batch_size=1638400,linger_ms=1,buffer_memory=33445532*2,max_request_size=1024*1024*50) #,batch_size=20480000,buffer_memory=64000000) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],compression_type='gzip',batch_size=1638400,linger_ms=1,buffer_memory=33445532*2,max_request_size=1024*1024*50) #,batch_size=20480000,buffer_memory=64000000)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
log.info(kafka_result.get(timeout=10)) log.info(kafka_result.get(timeout=10))
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论