#!/usr/bin/python # -*- coding: utf-8 -*- from openpyxl import load_workbook from pathlib import Path from my_folders import source_file_folder from dataclasses import dataclass, asdict import matplotlib.pyplot as plt from itertools import islice def batched(iterable, batch_size): it = iter(iterable) while _batch := list(islice(it, batch_size)): yield _batch @dataclass(frozen=True) class Measure: ts: float # time stamp v: float i: float @dataclass class RawMeasure: ts: int ch: int v: float @dataclass(frozen=True) class DischargePoint(Measure): c: float # Capacity column_indexes = { # columns indexes that we want to import: None means try to find the column 'TimeStamp': 0, 'Ibatt': None, 'Vbatt': None, } def load_a_xlsx_file(path: Path) -> list[Measure]: """Returns a sorted list of Measures()""" discharge = [] wb_obj = load_workbook(path) # Get workbook active sheet object # from the active attribute sheet_obj = wb_obj.active header_row_already_seen = False for row in sheet_obj.values: if all([c is None for c in row]): continue # Do skip empty lines if not header_row_already_seen: for idx, value in enumerate(row): if isinstance(value, str): for k in column_indexes: if k.lower() == value.lower(): column_indexes[k] = idx if k != 'Ibatt' else idx - 1 header_row_already_seen = all([v is not None for v in column_indexes.values()]) if header_row_already_seen: print(column_indexes) ts = row[column_indexes['TimeStamp']] v = row[column_indexes['Vbatt']] i = row[column_indexes['Ibatt']] if None not in (ts, v, i): i /= 0.010007 measure = Measure(ts=ts, v=v, i=i) # print(measure) discharge.append(measure) discharge.sort(key=lambda x: x.ts) return discharge def load_a_txt_file(path: Path) -> list[Measure]: """Returns a sorted list of Measures()""" # 1742542808 Ch 101 4.6694100e-03 VDC 1742542808 Ch 102 3.6288222e+00 VDC # 1742542838 Ch 101 4.6651780e-03 VDC 1742542838 Ch 102 3.6315371e+00 VDC # # 0 1 2 3 4 5 6 7 8 9 discharge = [] with open(path, 'r') as fin: for raw_line in fin: if len(raw_line) < 10: # arbitrario per rimuovere linee sicuramente sbagliate continue if 'slv' in raw_line.lower(): continue fields = raw_line.strip().strip('\n').split() if len(fields) < 10: continue # remove malformed lines ts = int(fields[0]) v = float(fields[8]) i = float(fields[3]) if None not in (ts, v, i): i /= 0.010007 measure = Measure(ts=ts, v=v, i=i) # print(measure) discharge.append(measure) discharge.sort(key=lambda x: x.ts) return discharge def available_measurements(s_in: str): """ generates the series of RawMeasure contained in the s_in string :param s_in: :return: """ fields = s_in.strip().strip('\n').replace('VDC1', 'VDC 1').split() for batch in batched(fields, 5): ts, ch, chn, value, unit = batch.split() if ch != 'Ch' or not unit.endswith('DC'): continue yield RawMeasure(ts=int(ts), ch=int(chn), v=float(value)) def load_a_txt_file_4(path: Path) -> list[Measure]: """Returns a sorted list of Measures()""" # 1742542808 Ch 101 4.6694100e-03 VDC 1742542808 Ch 102 3.6288222e+00 VDC # 1742542838 Ch 101 4.6651780e-03 VDC 1742542838 Ch 102 3.6315371e+00 VDC # 1743758985 Ch 101 2.2117630e-03 VDC 1743758985 Ch 102 2.9622319e+00 VDC1743758985 Ch 103 1.9700000e-03 VDC 1743758985 Ch 104 3.0066588e+00 VDC # 0 1 2 3 4 5 6 7 8 9 discharge = [] with open(path, 'r') as fin: for raw_line in fin: if len(raw_line) < 10: # arbitrario per rimuovere linee sicuramente sbagliate continue if 'slv' in raw_line.lower(): continue fields = raw_line.strip().strip('\n').split() if len(fields) < 10: continue # remove malformed lines ts = int(fields[0]) v = float(fields[8]) i = float(fields[3]) if None not in (ts, v, i): i /= 0.010007 measure = Measure(ts=ts, v=v, i=i) # print(measure) discharge.append(measure) discharge.sort(key=lambda x: x.ts) return discharge def compute_discharge(measures: list[Measure], initial_capacity: float = 0.0) -> list[DischargePoint]: discharge_sequence = [] last_m = None t0 = None for m in measures: if t0 is None: t0 = m.ts if last_m is not None: initial_capacity += (m.i + last_m.i) * 0.5 * (m.ts - last_m.ts) / 3600.0 meas = asdict(m) meas['ts'] -= t0 discharge_sequence.append(DischargePoint(c=initial_capacity, **meas)) last_m = m return discharge_sequence def show_a_discharge(discharge: list[DischargePoint], title='Title'): out_name = f'{title}.png' if '100_per' in title: title = 'Battery Option: 100%' elif '80_per' in title: title = 'Battery Option: 80%' else: title = f'Battery Option: OFF ({title})' how_many = 1 fig, axs_maybe = plt.subplots(how_many, 1, tight_layout=False, figsize=(7.85, 5.38)) # fig.suptitle(f'Recahrging Behavior', fontsize=20) # fig, ax = plt.subplots(figsize=(11, 6)) if how_many == 1: axs = (axs_maybe,) else: axs = axs_maybe ax = axs[0] ax.grid(True) axr = ax.twinx() # ax.set_xlim(0.0, 125) mins = [] vbatt = [] ibatt = [] caps = [] for dp in discharge: mins.append(dp.ts/60.0) vbatt.append(dp.v) ibatt.append(dp.i * 1000.0) caps.append(dp.c * 1000.0) ax.plot(mins, vbatt, color='g', lw=3, alpha=0.5, label='Battery Voltage') axr.plot(mins, caps, color='b', lw=3, alpha=0.5, label='Accumulated Capacity') axr.plot(mins, ibatt, color='r', lw=3, alpha=0.5, label='Charging Current') ax.set_xlim(0, None) ax.set_ylim(3.0, 4.3) axr.set_ylim(0, 930.0) axr.set_ylabel('Capacity [mAh] / Current [mA]') ax.set_xlabel('time [minutes]') ax.set_ylabel('Cell Voltage [V]') ax.set_title(title) axr.legend(loc='center right') ax.legend(loc='upper left') txt = f'Total Capacity: {caps[-1]:.1f} mAh' print(txt) axr.text(mins[-1], caps[-1] - 150, txt, ha='right', fontsize=10, bbox={'facecolor': 'cyan', 'alpha': 0.7, 'pad': 5}) # fig.tight_layout() fig.savefig(out_name, dpi=300) plt.close() if __name__ == '__main__': for f in source_file_folder.iterdir(): if f.stem != 'log_2025_04_04_09': continue if f.is_file(): if f.suffix.lower() == '.xlsx': print(f) measured_data = load_a_xlsx_file(f) elif f.suffix.lower() == '.txt': print(f) measured_data = load_a_txt_file(f) else: continue discharge = compute_discharge(measured_data) show_a_discharge(discharge, title=f.stem.replace(' ', '_')) # break # for d in discharge: # print(d)