Skip to content

Commit

Permalink
update:优化非win多线程,获取异常并打印
Browse files Browse the repository at this point in the history
  • Loading branch information
fangzheng committed Dec 2, 2023
1 parent cfcd30b commit 2818374
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 63 deletions.
141 changes: 78 additions & 63 deletions lianjia.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import textwrap
import time
from multiprocessing import Pool
from urllib.parse import urlparse

import pandas as pd
Expand Down Expand Up @@ -452,17 +453,17 @@ def get_merged_dict(xiaoqu_detail):
return final_result


def to_excel(province_name, city, area):
def to_excel(province_name, city_name, area_name):
current_timestamp = time.time()
current_timestamp = int(current_timestamp)
file_path = f'{province_name}数据_{current_timestamp}.xlsx'
lj_base_areas_sql = f"lj_base_areas"
if city:
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' and city_name='{city}' )"
file_path = f'{province_name}-{city}数据_{current_timestamp}.xlsx'
if area:
lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area}')"
file_path = f'{province_name}-{city}-{area}数据_{current_timestamp}.xlsx'
if city_name:
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' and city_name='{city_name}' )"
file_path = f'{province_name}-{city_name}数据_{current_timestamp}.xlsx'
if area_name:
lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area_name}')"
file_path = f'{province_name}-{city_name}-{area_name}数据_{current_timestamp}.xlsx'
else:
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' )"

Expand Down Expand Up @@ -515,67 +516,79 @@ def to_excel(province_name, city, area):


def process_elements(all_xiaoqu):
current_process = multiprocessing.current_process()
current_process.name = "Lianjia_Process"
xiaoqu_size = len(all_xiaoqu)
for index, xiaoqu in enumerate(all_xiaoqu):
xiaoqu_id = xiaoqu['xiaoqu_id']
db.delete(table='lj_xiaoqu_detail', condition=f" xiaoqu_id = '{xiaoqu_id}'")
xiaoqu_url = xiaoqu['xiaoqu_url']
print2(f"{current_process.name}==>[{index}/{xiaoqu_size}]{xiaoqu_url}")
xiaoqu_detail = get_community_detail(url=xiaoqu_url)
merged_dict = get_merged_dict(xiaoqu_detail)

# [{'label': '建筑类型', 'value': '塔楼/板楼/塔板结合/平房'},
# {'label': '房屋总数', 'value': '250户'},
# {'label': '楼栋总数', 'value': '7栋'},
# {'label': '绿化率', 'value': '15%'},
# {'label': '容积率', 'value': '2.2'},
# {'label': '交易权属', 'value': '商品房/房改房'},
# {'label': '建成年代', 'value': '暂无信息'},
# {'label': '供暖类型', 'value': '集中供暖/自采暖'},
# {'label': '用水类型', 'value': '民水'},
# {'label': '用电类型', 'value': '民电'},
# {'label': '挂牌均价', 'value': 11376},
# {'label': '物业费', 'value': '0.1至0.45元/平米/月'},
# {'label': '经纬度', 'value': '113.676176,34.782529'}]
if xiaoqu_detail:
insert_detail = {
'xiaoqu_id': xiaoqu_id,
'detail_json': json.dumps(merged_dict, ensure_ascii=False)
}
db.insert(table='lj_xiaoqu_detail', data=insert_detail)
try:
current_process = multiprocessing.current_process()
current_process.name = "Lianjia_Process"
xiaoqu_size = len(all_xiaoqu)
for index, xiaoqu in enumerate(all_xiaoqu):
xiaoqu_id = xiaoqu['xiaoqu_id']
db.delete(table='lj_xiaoqu_detail', condition=f" xiaoqu_id = '{xiaoqu_id}'")
xiaoqu_url = xiaoqu['xiaoqu_url']
print2(f"{current_process.name}==>[{index}/{xiaoqu_size}]{xiaoqu_url}")
xiaoqu_detail = get_community_detail(url=xiaoqu_url)
merged_dict = get_merged_dict(xiaoqu_detail)

# [{'label': '建筑类型', 'value': '塔楼/板楼/塔板结合/平房'},
# {'label': '房屋总数', 'value': '250户'},
# {'label': '楼栋总数', 'value': '7栋'},
# {'label': '绿化率', 'value': '15%'},
# {'label': '容积率', 'value': '2.2'},
# {'label': '交易权属', 'value': '商品房/房改房'},
# {'label': '建成年代', 'value': '暂无信息'},
# {'label': '供暖类型', 'value': '集中供暖/自采暖'},
# {'label': '用水类型', 'value': '民水'},
# {'label': '用电类型', 'value': '民电'},
# {'label': '挂牌均价', 'value': 11376},
# {'label': '物业费', 'value': '0.1至0.45元/平米/月'},
# {'label': '经纬度', 'value': '113.676176,34.782529'}]
if xiaoqu_detail:
insert_detail = {
'xiaoqu_id': xiaoqu_id,
'detail_json': json.dumps(merged_dict, ensure_ascii=False)
}
db.insert(table='lj_xiaoqu_detail', data=insert_detail)
except Exception as e:
return f"Exception in child process: {str(e)}"


def process_list(input_list):
cpu_core_num = psutil.cpu_count(logical=False)
split_result = split_list(input_list, cpu_core_num)

process_elements(input_list)

# if sys_platform == 'win32': # windows系统不采用多进程形式跑任务
# process_elements(input_list)
# else:
# p = Pool()
# for xiaoqu_list in split_result:
# p.apply_async(process_elements, args=(xiaoqu_list,))
# print2('Waiting for all subprocesses done...')
# p.close()
# p.join()
# print2('All subprocesses done.')


def spider_by_condition(province, city=None, area=None):
echo_msg = f"开始采集[{province}"
if sys_platform == 'win32': # windows系统不采用多进程形式跑任务
process_elements(input_list)
else:
p = Pool()
subprocess_results = []
for xiaoqu_list in split_result:
result = p.apply_async(process_elements, args=(xiaoqu_list,))
subprocess_results.append(result)
print2('Waiting for all subprocesses done...')
p.close()
p.join()

for result in subprocess_results:
try:
res = result.get()
if res:
print(res) # 打印子进程的异常信息
except Exception as e:
print(f"Error in child process: {str(e)}")

print('All subprocesses done.')


def spider_by_condition(province_name, city_name=None, area_name=None):
echo_msg = f"开始采集[{province_name}"
lj_base_areas_sql = f"lj_base_areas"
if city:
echo_msg += f"-{city}"
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province}' and city_name='{city}' )"
if area:
echo_msg += f"-{area}"
lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area}')"
if city_name:
echo_msg += f"-{city_name}"
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' and city_name='{city_name}' )"
if area_name:
echo_msg += f"-{area_name}"
lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area_name}')"
else:
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province}' )"
lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' )"
echo_msg += f"]区域下数据..."
print2(echo_msg)

Expand Down Expand Up @@ -625,7 +638,9 @@ def statistics_info():
"""
all_xiaoqu = db.query(sql)
statistics_df = pd.DataFrame(all_xiaoqu)
print2("\n", statistics_df.to_string(index=False))
print("======================统计信息======================")
print(statistics_df.to_string(index=False))
print("===================================================")


def print_disclaimer():
Expand Down Expand Up @@ -660,8 +675,8 @@ def run():
area = input("请输入省份下城市下区域名称(可选): ")
if province:
db_init(province_name=province, city_name=city)
spider_by_condition(province=province, city=city, area=area)
to_excel(province, city, area)
spider_by_condition(province_name=province, city_name=city, area_name=area)
to_excel(province_name=province, city_name=city, area_name=area)


def main():
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
certifi==2023.7.22
charset-normalizer==3.3.1
et-xmlfile==1.1.0
idna==3.4
lxml==4.9.3
numpy==1.24.4
openpyxl==3.1.2
pandas==2.0.3
psutil==5.9.6
pypinyin==0.49.0
python-dateutil==2.8.2
pytz==2023.3.post1
requests==2.31.0
Expand Down

0 comments on commit 2818374

Please sign in to comment.