#!/opt/miniconda3/bin/python3 import os import time import datetime import glob import re import json import pandas import xlrd import openpyxl as xl from openpyxl.worksheet.worksheet import Worksheet from openpyxl.cell import MergedCell def debug(*values: object, sep: str = None, end: str = None): # print(*values, sep=sep, end=end) return def info(*values: object, sep: str = None, end: str = None): print(*values, sep=sep, end=end) return # 将excel转换为csv # 指定sheet_name def excel2csv_1(sheet_name, excelfilename, csvfilename): wb = xlrd.open_workbook(excelfilename, logfile=open(os.devnull, 'w')) df = pandas.read_excel(wb, sheet_name=sheet_name) os.makedirs(os.path.dirname(csvfilename), exist_ok=True) df.to_csv(csvfilename) # 处理合并表格,复制相同内容 def parse_merged_cell(sheet:Worksheet, row, col): cell = sheet.cell(row=row, column=col) if isinstance(cell, MergedCell): for merged_range in sheet.merged_cells.ranges: if cell.coordinate in merged_range: cell = sheet.cell(row=merged_range.min_row, column=merged_range.min_col) break return cell # 将excel转换为json, # 提取特定流量汇总数据 def excel2json_1(excelfilename, jsonfilename): json_obj = {} workbook = xl.load_workbook(excelfilename, data_only=True) sheetname = "汇总数据" sheet = workbook[sheetname] json_obj["核心网A平面"] = json_sheet = {} for row_index in range(2,5,2): for col_index in range(1,10): key_cell = sheet.cell(row=row_index, column=col_index) val_cell = sheet.cell(row=row_index+1, column=col_index) if key_cell.value and val_cell: key = key_cell.value val = val_cell.value json_sheet[key] = val json_obj["核心网B平面"] = json_sheet = {} for row_index in range(8,11,2): for col_index in range(1,sheet.max_column): key_cell = sheet.cell(row=row_index, column=col_index) val_cell = sheet.cell(row=row_index+1, column=col_index) if key_cell.value and val_cell: key = key_cell.value val = val_cell.value json_sheet[key] = val for row_index in range(28,29,2): for col_index in range(1,sheet.max_column): key_cell = sheet.cell(row=row_index, column=col_index) val_cell = sheet.cell(row=row_index+1, column=col_index) if key_cell.value and val_cell: key = key_cell.value val = val_cell.value json_obj[key] = val for row_index in range(40,41,2): for col_index in range(1,sheet.max_column): key_cell = sheet.cell(row=row_index, column=col_index) val_cell = sheet.cell(row=row_index+1, column=col_index) if key_cell.value and val_cell: key = key_cell.value val = val_cell.value json_obj[key] = val json_str = json.dumps(json_obj, indent=4, ensure_ascii=False) debug(json_str) os.makedirs(os.path.dirname(jsonfilename), exist_ok=True) with open(jsonfilename, mode='w', encoding='utf8') as fd: fd.write(json_str) # 将excel转换为json, # 处理合并表格数据 def excel2json_2(excelfilename, jsonfilename): json_obj = {} workbook = xl.load_workbook(excelfilename, data_only=True) for sheetname in workbook.sheetnames: sheet = workbook[sheetname] json_sheet = [] debug(sheetname) cells = [] for row_index in range(1,sheet.max_row+1): cols = [None]*(sheet.max_column+1) cells += [cols] allcolsisnone = True for col_index in range(1,sheet.max_column+1): cell = parse_merged_cell(sheet, row_index, col_index) if cell.value: allcolsisnone = False cols[col_index-1] = str(cell.value).replace("\n","") if isinstance(cell.value, str) else cell.value if not allcolsisnone: if row_index == 1: keys = cells[0] for i in range(len(keys)): val = cols[i] debug(val," \t",end="") debug() if row_index > 1: json_one = {} keys = cells[0] for i in range(len(keys)): key = keys[i] val = cols[i] debug(type(val).__name__+":"+str(val)," \t",end="") if val is not None: if key is None: key = 'col{}'.format(i+1) if isinstance(val, datetime.time): val = '{}'.format(val) json_one[key] = val json_sheet.append(json_one) debug() if len(json_sheet) > 0: json_obj[sheetname] = json_sheet json_str = json.dumps(json_obj, indent=4, ensure_ascii=False) debug(json_str) os.makedirs(os.path.dirname(jsonfilename), exist_ok=True) with open(jsonfilename, mode='w', encoding='utf8') as fd: fd.write(json_str) # 覆盖已经存在的结果文件 overwrite = False def main(): # 文件扫描 files = glob.glob("report/recent/**", recursive=True) for f in files: if re.match('''.*/系统一部/SHVPSGSDB1.*.xls''', f): info('系统一部 Excel转CSV', f) csvfn = f.replace(".xls",".csv") .replace("report/recent/","report/files/") if overwrite or not os.path.exists(csvfn): excel2csv_1("历史性能",f,csvfn) if re.match('''.*/基础设施部/.*.xlsx''', f): info('基础设施部 Excel转JSON', f) jsonfn = f.replace(".xlsx",".json") .replace("report/recent/","report/files/") if overwrite or not os.path.exists(jsonfn): excel2json_2(f,jsonfn) if re.match('''.*/网络与平台部/关键设备CPU内存使用率统计日报报表.*.xls''', f): info('网络与平台部 Excel转CSV', f) tofn = f.replace(".xls",".csv") .replace("report/recent/","report/files/") if overwrite or not os.path.exists(tofn): excel2csv_1("Sheet1",f,tofn) if re.match('''.*/网络与平台部/支付系统流量日报.*.xls''', f): info('网络与平台部 Excel转JSON', f) tofn = f.replace(".xlsx",".json") .replace("report/recent/","report/files/") if overwrite or not os.path.exists(tofn): excel2json_1(f,tofn) # 自动清除,保留10天 files = glob.glob("report/files/**", recursive=True) for f in files: if os.path.isfile(f) and os.path.getmtime(f) < time.time() - 3600 * 24 * 10: os.remove(f) # 清除空目录 dir = os.path.dirname(f) while not os.listdir(dir): os.rmdir(dir) dir = os.path.dirname(dir) return if __name__ == '__main__': try: # 重定向当前目录 os.chdir(os.path.dirname(os.path.abspath(__file__))) exit(main()) except KeyboardInterrupt as ki: info("") exit(130)