diff --git a/lianjia.py b/lianjia.py index 4182559..fac7ee8 100644 --- a/lianjia.py +++ b/lianjia.py @@ -7,6 +7,7 @@ import sys import textwrap import time +from multiprocessing import Pool from urllib.parse import urlparse import pandas as pd @@ -452,17 +453,17 @@ def get_merged_dict(xiaoqu_detail): return final_result -def to_excel(province_name, city, area): +def to_excel(province_name, city_name, area_name): current_timestamp = time.time() current_timestamp = int(current_timestamp) file_path = f'{province_name}数据_{current_timestamp}.xlsx' lj_base_areas_sql = f"lj_base_areas" - if city: - lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' and city_name='{city}' )" - file_path = f'{province_name}-{city}数据_{current_timestamp}.xlsx' - if area: - lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area}')" - file_path = f'{province_name}-{city}-{area}数据_{current_timestamp}.xlsx' + if city_name: + lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' and city_name='{city_name}' )" + file_path = f'{province_name}-{city_name}数据_{current_timestamp}.xlsx' + if area_name: + lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area_name}')" + file_path = f'{province_name}-{city_name}-{area_name}数据_{current_timestamp}.xlsx' else: lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' )" @@ -515,67 +516,79 @@ def to_excel(province_name, city, area): def process_elements(all_xiaoqu): - current_process = multiprocessing.current_process() - current_process.name = "Lianjia_Process" - xiaoqu_size = len(all_xiaoqu) - for index, xiaoqu in enumerate(all_xiaoqu): - xiaoqu_id = xiaoqu['xiaoqu_id'] - db.delete(table='lj_xiaoqu_detail', condition=f" xiaoqu_id = '{xiaoqu_id}'") - xiaoqu_url = xiaoqu['xiaoqu_url'] - print2(f"{current_process.name}==>[{index}/{xiaoqu_size}]{xiaoqu_url}") - xiaoqu_detail = get_community_detail(url=xiaoqu_url) - merged_dict = get_merged_dict(xiaoqu_detail) - - # [{'label': '建筑类型', 'value': '塔楼/板楼/塔板结合/平房'}, - # {'label': '房屋总数', 'value': '250户'}, - # {'label': '楼栋总数', 'value': '7栋'}, - # {'label': '绿化率', 'value': '15%'}, - # {'label': '容积率', 'value': '2.2'}, - # {'label': '交易权属', 'value': '商品房/房改房'}, - # {'label': '建成年代', 'value': '暂无信息'}, - # {'label': '供暖类型', 'value': '集中供暖/自采暖'}, - # {'label': '用水类型', 'value': '民水'}, - # {'label': '用电类型', 'value': '民电'}, - # {'label': '挂牌均价', 'value': 11376}, - # {'label': '物业费', 'value': '0.1至0.45元/平米/月'}, - # {'label': '经纬度', 'value': '113.676176,34.782529'}] - if xiaoqu_detail: - insert_detail = { - 'xiaoqu_id': xiaoqu_id, - 'detail_json': json.dumps(merged_dict, ensure_ascii=False) - } - db.insert(table='lj_xiaoqu_detail', data=insert_detail) + try: + current_process = multiprocessing.current_process() + current_process.name = "Lianjia_Process" + xiaoqu_size = len(all_xiaoqu) + for index, xiaoqu in enumerate(all_xiaoqu): + xiaoqu_id = xiaoqu['xiaoqu_id'] + db.delete(table='lj_xiaoqu_detail', condition=f" xiaoqu_id = '{xiaoqu_id}'") + xiaoqu_url = xiaoqu['xiaoqu_url'] + print2(f"{current_process.name}==>[{index}/{xiaoqu_size}]{xiaoqu_url}") + xiaoqu_detail = get_community_detail(url=xiaoqu_url) + merged_dict = get_merged_dict(xiaoqu_detail) + + # [{'label': '建筑类型', 'value': '塔楼/板楼/塔板结合/平房'}, + # {'label': '房屋总数', 'value': '250户'}, + # {'label': '楼栋总数', 'value': '7栋'}, + # {'label': '绿化率', 'value': '15%'}, + # {'label': '容积率', 'value': '2.2'}, + # {'label': '交易权属', 'value': '商品房/房改房'}, + # {'label': '建成年代', 'value': '暂无信息'}, + # {'label': '供暖类型', 'value': '集中供暖/自采暖'}, + # {'label': '用水类型', 'value': '民水'}, + # {'label': '用电类型', 'value': '民电'}, + # {'label': '挂牌均价', 'value': 11376}, + # {'label': '物业费', 'value': '0.1至0.45元/平米/月'}, + # {'label': '经纬度', 'value': '113.676176,34.782529'}] + if xiaoqu_detail: + insert_detail = { + 'xiaoqu_id': xiaoqu_id, + 'detail_json': json.dumps(merged_dict, ensure_ascii=False) + } + db.insert(table='lj_xiaoqu_detail', data=insert_detail) + except Exception as e: + return f"Exception in child process: {str(e)}" def process_list(input_list): cpu_core_num = psutil.cpu_count(logical=False) split_result = split_list(input_list, cpu_core_num) - process_elements(input_list) - - # if sys_platform == 'win32': # windows系统不采用多进程形式跑任务 - # process_elements(input_list) - # else: - # p = Pool() - # for xiaoqu_list in split_result: - # p.apply_async(process_elements, args=(xiaoqu_list,)) - # print2('Waiting for all subprocesses done...') - # p.close() - # p.join() - # print2('All subprocesses done.') - - -def spider_by_condition(province, city=None, area=None): - echo_msg = f"开始采集[{province}" + if sys_platform == 'win32': # windows系统不采用多进程形式跑任务 + process_elements(input_list) + else: + p = Pool() + subprocess_results = [] + for xiaoqu_list in split_result: + result = p.apply_async(process_elements, args=(xiaoqu_list,)) + subprocess_results.append(result) + print2('Waiting for all subprocesses done...') + p.close() + p.join() + + for result in subprocess_results: + try: + res = result.get() + if res: + print(res) # 打印子进程的异常信息 + except Exception as e: + print(f"Error in child process: {str(e)}") + + print('All subprocesses done.') + + +def spider_by_condition(province_name, city_name=None, area_name=None): + echo_msg = f"开始采集[{province_name}" lj_base_areas_sql = f"lj_base_areas" - if city: - echo_msg += f"-{city}" - lj_base_province_sql = f"(select * from lj_base_province where province_name='{province}' and city_name='{city}' )" - if area: - echo_msg += f"-{area}" - lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area}')" + if city_name: + echo_msg += f"-{city_name}" + lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' and city_name='{city_name}' )" + if area_name: + echo_msg += f"-{area_name}" + lj_base_areas_sql = f"(select * from lj_base_areas t where region_name='{area_name}')" else: - lj_base_province_sql = f"(select * from lj_base_province where province_name='{province}' )" + lj_base_province_sql = f"(select * from lj_base_province where province_name='{province_name}' )" echo_msg += f"]区域下数据..." print2(echo_msg) @@ -625,7 +638,9 @@ def statistics_info(): """ all_xiaoqu = db.query(sql) statistics_df = pd.DataFrame(all_xiaoqu) - print2("\n", statistics_df.to_string(index=False)) + print("======================统计信息======================") + print(statistics_df.to_string(index=False)) + print("===================================================") def print_disclaimer(): @@ -660,8 +675,8 @@ def run(): area = input("请输入省份下城市下区域名称(可选): ") if province: db_init(province_name=province, city_name=city) - spider_by_condition(province=province, city=city, area=area) - to_excel(province, city, area) + spider_by_condition(province_name=province, city_name=city, area_name=area) + to_excel(province_name=province, city_name=city, area_name=area) def main(): diff --git a/requirements.txt b/requirements.txt index 6c949b2..28024a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,13 @@ certifi==2023.7.22 charset-normalizer==3.3.1 +et-xmlfile==1.1.0 idna==3.4 lxml==4.9.3 numpy==1.24.4 +openpyxl==3.1.2 pandas==2.0.3 psutil==5.9.6 +pypinyin==0.49.0 python-dateutil==2.8.2 pytz==2023.3.post1 requests==2.31.0