| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- #!/opt/miniconda3/bin/python3
- import os
- import time
- import datetime
- import glob
- import re
- import json
- import pandas
- import xlrd
- import openpyxl as xl
- from openpyxl.worksheet.worksheet import Worksheet
- from openpyxl.cell import MergedCell
- def debug(*values: object, sep: str = None, end: str = None):
- # print(*values, sep=sep, end=end)
- return
- def info(*values: object, sep: str = None, end: str = None):
- print(*values, sep=sep, end=end)
- return
- # 将excel转换为csv
- # 指定sheet_name
- def excel2csv_1(sheet_name, excelfilename, csvfilename):
- wb = xlrd.open_workbook(excelfilename, logfile=open(os.devnull, 'w'))
- df = pandas.read_excel(wb, sheet_name=sheet_name)
- os.makedirs(os.path.dirname(csvfilename), exist_ok=True)
- df.to_csv(csvfilename)
- # 处理合并表格,复制相同内容
- def parse_merged_cell(sheet:Worksheet, row, col):
- cell = sheet.cell(row=row, column=col)
- if isinstance(cell, MergedCell):
- for merged_range in sheet.merged_cells.ranges:
- if cell.coordinate in merged_range:
- cell = sheet.cell(row=merged_range.min_row, column=merged_range.min_col)
- break
- return cell
- # 将excel转换为json,
- # 提取特定流量汇总数据
- def excel2json_1(excelfilename, jsonfilename):
- json_obj = {}
- workbook = xl.load_workbook(excelfilename, data_only=True)
- sheetname = "汇总数据"
- sheet = workbook[sheetname]
- json_obj["核心网A平面"] = json_sheet = {}
- for row_index in range(2,5,2):
- for col_index in range(1,10):
- key_cell = sheet.cell(row=row_index, column=col_index)
- val_cell = sheet.cell(row=row_index+1, column=col_index)
- if key_cell.value and val_cell:
- key = key_cell.value
- val = val_cell.value
- json_sheet[key] = val
- json_obj["核心网B平面"] = json_sheet = {}
- for row_index in range(8,11,2):
- for col_index in range(1,sheet.max_column):
- key_cell = sheet.cell(row=row_index, column=col_index)
- val_cell = sheet.cell(row=row_index+1, column=col_index)
- if key_cell.value and val_cell:
- key = key_cell.value
- val = val_cell.value
- json_sheet[key] = val
- for row_index in range(28,29,2):
- for col_index in range(1,sheet.max_column):
- key_cell = sheet.cell(row=row_index, column=col_index)
- val_cell = sheet.cell(row=row_index+1, column=col_index)
- if key_cell.value and val_cell:
- key = key_cell.value
- val = val_cell.value
- json_obj[key] = val
- for row_index in range(40,41,2):
- for col_index in range(1,sheet.max_column):
- key_cell = sheet.cell(row=row_index, column=col_index)
- val_cell = sheet.cell(row=row_index+1, column=col_index)
- if key_cell.value and val_cell:
- key = key_cell.value
- val = val_cell.value
- json_obj[key] = val
- json_str = json.dumps(json_obj, indent=4, ensure_ascii=False)
- debug(json_str)
- os.makedirs(os.path.dirname(jsonfilename), exist_ok=True)
- with open(jsonfilename, mode='w', encoding='utf8') as fd:
- fd.write(json_str)
- # 将excel转换为json,
- # 处理合并表格数据
- def excel2json_2(excelfilename, jsonfilename):
- json_obj = {}
- workbook = xl.load_workbook(excelfilename, data_only=True)
- for sheetname in workbook.sheetnames:
- sheet = workbook[sheetname]
- json_sheet = []
- debug(sheetname)
- cells = []
- for row_index in range(1,sheet.max_row+1):
- cols = [None]*(sheet.max_column+1)
- cells += [cols]
- allcolsisnone = True
- for col_index in range(1,sheet.max_column+1):
- cell = parse_merged_cell(sheet, row_index, col_index)
- if cell.value:
- allcolsisnone = False
- cols[col_index-1] = str(cell.value).replace("\n","") if isinstance(cell.value, str) else cell.value
- if not allcolsisnone:
- if row_index == 1:
- keys = cells[0]
- for i in range(len(keys)):
- val = cols[i]
- debug(val," \t",end="")
- debug()
- if row_index > 1:
- json_one = {}
- keys = cells[0]
- for i in range(len(keys)):
- key = keys[i]
- val = cols[i]
- debug(type(val).__name__+":"+str(val)," \t",end="")
- if val is not None:
- if key is None:
- key = 'col{}'.format(i+1)
- if isinstance(val, datetime.time):
- val = '{}'.format(val)
- json_one[key] = val
- json_sheet.append(json_one)
- debug()
- if len(json_sheet) > 0:
- json_obj[sheetname] = json_sheet
- json_str = json.dumps(json_obj, indent=4, ensure_ascii=False)
- debug(json_str)
- os.makedirs(os.path.dirname(jsonfilename), exist_ok=True)
- with open(jsonfilename, mode='w', encoding='utf8') as fd:
- fd.write(json_str)
-
- # 覆盖已经存在的结果文件
- overwrite = False
- def main():
- # 文件扫描
- files = glob.glob("report/recent/**", recursive=True)
- for f in files:
- if re.match('''.*/系统一部/SHVPSGSDB1.*.xls''', f):
- info('系统一部 Excel转CSV', f)
- csvfn = f.replace(".xls",".csv") .replace("report/recent/","report/files/")
- if overwrite or not os.path.exists(csvfn):
- excel2csv_1("历史性能",f,csvfn)
- if re.match('''.*/基础设施部/.*.xlsx''', f):
- info('基础设施部 Excel转JSON', f)
- jsonfn = f.replace(".xlsx",".json") .replace("report/recent/","report/files/")
- if overwrite or not os.path.exists(jsonfn):
- excel2json_2(f,jsonfn)
- if re.match('''.*/网络与平台部/关键设备CPU内存使用率统计日报报表.*.xls''', f):
- info('网络与平台部 Excel转CSV', f)
- tofn = f.replace(".xls",".csv") .replace("report/recent/","report/files/")
- if overwrite or not os.path.exists(tofn):
- excel2csv_1("Sheet1",f,tofn)
- if re.match('''.*/网络与平台部/支付系统流量日报.*.xls''', f):
- info('网络与平台部 Excel转JSON', f)
- tofn = f.replace(".xlsx",".json") .replace("report/recent/","report/files/")
- if overwrite or not os.path.exists(tofn):
- excel2json_1(f,tofn)
-
- # 自动清除,保留10天
- files = glob.glob("report/files/**", recursive=True)
- for f in files:
- if os.path.isfile(f) and os.path.getmtime(f) < time.time() - 3600 * 24 * 10:
- os.remove(f)
- # 清除空目录
- dir = os.path.dirname(f)
- while not os.listdir(dir):
- os.rmdir(dir)
- dir = os.path.dirname(dir)
-
- return
- if __name__ == '__main__':
- try:
- # 重定向当前目录
- os.chdir(os.path.dirname(os.path.abspath(__file__)))
- exit(main())
- except KeyboardInterrupt as ki:
- info("")
- exit(130)
|