diff --git a/load_data_from_multiple.py b/load_data_from_multiple.py new file mode 100644 index 0000000..9cf71a7 --- /dev/null +++ b/load_data_from_multiple.py @@ -0,0 +1,176 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +from openpyxl import load_workbook +from pathlib import Path +from my_folders import source_file_folder +from dataclasses import dataclass, asdict +import matplotlib.pyplot as plt +from itertools import islice + + +def batched(iterable, batch_size): + it = iter(iterable) + while _batch := list(islice(it, batch_size)): + yield _batch + + +@dataclass(frozen=True) +class Measure: + ts: float # time stamp + v: float + i: float + + +@dataclass +class RawMeasure: + ts: int + ch: int + v: float + + +@dataclass(frozen=True) +class DischargePoint(Measure): + c: float # Capacity + + +column_indexes = { # columns indexes that we want to import: None means try to find the column + 'TimeStamp': 0, + 'Ibatt': None, + 'Vbatt': None, +} + + +def available_measurements(s_in: str): + """ + generates the series of RawMeasure contained in the s_in string + + :param s_in: + :return: + """ + fields = s_in.strip().strip('\n').replace('VDC1', 'VDC 1').split() + for ts, ch, chn, value, unit in batched(fields, 5): + if ch != 'Ch' or not unit.endswith('DC'): + continue + yield RawMeasure(ts=int(ts), ch=int(chn), v=float(value)) + + +def load_a_txt_file(path: Path, duts: dict[str, tuple[int, int]]) -> dict[str, list[Measure]]: + """Returns a sorted list of Measures()""" + + # 1742542808 Ch 101 4.6694100e-03 VDC 1742542808 Ch 102 3.6288222e+00 VDC + # 1742542838 Ch 101 4.6651780e-03 VDC 1742542838 Ch 102 3.6315371e+00 VDC + # 1743758985 Ch 101 2.2117630e-03 VDC 1743758985 Ch 102 2.9622319e+00 VDC1743758985 Ch 103 1.9700000e-03 VDC 1743758985 Ch 104 3.0066588e+00 VDC + # 1743759015 Ch 101 2.1362400e-03 VDC 1743759015 Ch 102 3.0225466e+00 VDC1743759015 Ch 103 1.9600170e-03 VDC 1743759015 Ch 104 3.0766279e+00 VDC + # 1743759046 Ch 101 2.1291870e-03 VDC 1743759046 Ch 102 3.0660180e+00 VDC1743759046 Ch 103 1.9516620e-03 VDC 1743759046 Ch 104 3.1234983e+00 VDC + # 1743759076 Ch 101 2.1232190e-03 VDC 1743759076 Ch 102 3.1013554e+00 VDC1743759076 Ch 103 1.9452600e-03 VDC 1743759076 Ch 104 3.1604323e+00 VDC + # 1743759106 Ch 101 2.1214820e-03 VDC 1743759106 Ch 102 3.1316431e+00 VDC1743759106 Ch 103 1.9383150e-03 VDC 1743759106 Ch 104 3.1913389e+00 VDC + # 0 1 2 3 4 5 6 7 8 9 + discharges = {k: [] for k in duts.keys()} + with open(path, 'r') as fin: + for raw_line in fin: + if len(raw_line) < 10: # arbitrario per rimuovere linee sicuramente sbagliate + continue + if 'slv' in raw_line.lower(): + continue + fields = raw_line.strip().strip('\n').split() + if len(fields) < 10: + continue # remove malformed lines + measures = {m.ch: m for m in available_measurements(raw_line)} + for dut, dsch_ptis in discharges.items(): + chi, chv = duts[dut] + i = measures[chi] + v = measures[chv] + dsch_ptis.append(Measure(ts=i.ts, i=i.v / 0.010007, v=v.v)) + + for disc in discharges.values(): + disc.sort(key=lambda x: x.ts) + return discharges + + +def compute_discharge(measures: list[Measure], initial_capacity: float = 0.0) -> list[DischargePoint]: + discharge_sequence = [] + last_m = None + t0 = None + for m in measures: + if t0 is None: + t0 = m.ts + if last_m is not None: + initial_capacity += (m.i + last_m.i) * 0.5 * (m.ts - last_m.ts) / 3600.0 + meas = asdict(m) + meas['ts'] -= t0 + discharge_sequence.append(DischargePoint(c=initial_capacity, **meas)) + last_m = m + return discharge_sequence + + +def show_a_discharge(discharge: list[DischargePoint], title='Title'): + out_name = f'{title}.png' + if '100_per' in title: + title = 'Battery Option: 100%' + elif '80_per' in title: + title = 'Battery Option: 80%' + else: + title = f'Battery Option: OFF ({title})' + how_many = 1 + fig, axs_maybe = plt.subplots(how_many, 1, tight_layout=False, figsize=(7.85, 5.38)) + # fig.suptitle(f'Recahrging Behavior', fontsize=20) + # fig, ax = plt.subplots(figsize=(11, 6)) + if how_many == 1: + axs = (axs_maybe,) + else: + axs = axs_maybe + ax = axs[0] + ax.grid(True) + axr = ax.twinx() + # ax.set_xlim(0.0, 125) + mins = [] + vbatt = [] + ibatt = [] + caps = [] + for dp in discharge: + mins.append(dp.ts/60.0) + vbatt.append(dp.v) + ibatt.append(dp.i * 1000.0) + caps.append(dp.c * 1000.0) + ax.plot(mins, vbatt, color='g', lw=3, alpha=0.5, label='Battery Voltage') + axr.plot(mins, caps, color='b', lw=3, alpha=0.5, label='Accumulated Capacity') + axr.plot(mins, ibatt, color='r', lw=3, alpha=0.5, label='Charging Current') + ax.set_xlim(0, None) + ax.set_ylim(3.0, 4.3) + axr.set_ylim(0, None) + axr.set_ylabel('Capacity [mAh] / Current [mA]') + ax.set_xlabel('time [minutes]') + ax.set_ylabel('Cell Voltage [V]') + ax.set_title(title) + axr.legend(loc='center right') + ax.legend(loc='upper left') + txt = f'Total Capacity: {caps[-1]:.1f} mAh' + print(txt) + axr.text(mins[-1], caps[-1] - 150, txt, ha='right', fontsize=10, bbox={'facecolor': 'cyan', 'alpha': 0.7, 'pad': 5}) + # fig.tight_layout() + fig.savefig(out_name, dpi=300) + plt.close() + + +DUTS = { + 'SLV15': (101, 102), + 'SLV19': (103, 104), +} + +if __name__ == '__main__': + for f in source_file_folder.iterdir(): + if f.stem != 'log_2025_04_04_09': + continue + if f.is_file(): + if f.suffix.lower() == '.xlsx': + print('skipping ', f) + elif f.suffix.lower() == '.txt': + print(f) + measured_data = load_a_txt_file(f, DUTS) + # print(measured_data) + else: + continue + for name, discharge_points in measured_data.items(): + discharge = compute_discharge(discharge_points) + stem_under = f.stem.replace(' ', '_') + show_a_discharge(discharge, title=f'{name}_{stem_under}') diff --git a/load_data_from_xlsx.py b/load_data_from_xlsx.py index c9ca59d..e3b69fa 100644 --- a/load_data_from_xlsx.py +++ b/load_data_from_xlsx.py @@ -5,6 +5,12 @@ from pathlib import Path from my_folders import source_file_folder from dataclasses import dataclass, asdict import matplotlib.pyplot as plt +from itertools import islice + +def batched(iterable, batch_size): + it = iter(iterable) + while _batch := list(islice(it, batch_size)): + yield _batch @dataclass(frozen=True) @@ -14,6 +20,13 @@ class Measure: i: float +@dataclass +class RawMeasure: + ts: int + ch: int + v: float + + @dataclass(frozen=True) class DischargePoint(Measure): c: float # Capacity @@ -66,12 +79,60 @@ def load_a_txt_file(path: Path) -> list[Measure]: # 1742542808 Ch 101 4.6694100e-03 VDC 1742542808 Ch 102 3.6288222e+00 VDC # 1742542838 Ch 101 4.6651780e-03 VDC 1742542838 Ch 102 3.6315371e+00 VDC + # # 0 1 2 3 4 5 6 7 8 9 discharge = [] with open(path, 'r') as fin: for raw_line in fin: if len(raw_line) < 10: # arbitrario per rimuovere linee sicuramente sbagliate continue + if 'slv' in raw_line.lower(): + continue + fields = raw_line.strip().strip('\n').split() + if len(fields) < 10: + continue # remove malformed lines + ts = int(fields[0]) + v = float(fields[8]) + i = float(fields[3]) + if None not in (ts, v, i): + i /= 0.010007 + measure = Measure(ts=ts, v=v, i=i) + # print(measure) + discharge.append(measure) + + discharge.sort(key=lambda x: x.ts) + return discharge + + +def available_measurements(s_in: str): + """ + generates the series of RawMeasure contained in the s_in string + + :param s_in: + :return: + """ + fields = s_in.strip().strip('\n').replace('VDC1', 'VDC 1').split() + for batch in batched(fields, 5): + ts, ch, chn, value, unit = batch.split() + if ch != 'Ch' or not unit.endswith('DC'): + continue + yield RawMeasure(ts=int(ts), ch=int(chn), v=float(value)) + + +def load_a_txt_file_4(path: Path) -> list[Measure]: + """Returns a sorted list of Measures()""" + + # 1742542808 Ch 101 4.6694100e-03 VDC 1742542808 Ch 102 3.6288222e+00 VDC + # 1742542838 Ch 101 4.6651780e-03 VDC 1742542838 Ch 102 3.6315371e+00 VDC + # 1743758985 Ch 101 2.2117630e-03 VDC 1743758985 Ch 102 2.9622319e+00 VDC1743758985 Ch 103 1.9700000e-03 VDC 1743758985 Ch 104 3.0066588e+00 VDC + # 0 1 2 3 4 5 6 7 8 9 + discharge = [] + with open(path, 'r') as fin: + for raw_line in fin: + if len(raw_line) < 10: # arbitrario per rimuovere linee sicuramente sbagliate + continue + if 'slv' in raw_line.lower(): + continue fields = raw_line.strip().strip('\n').split() if len(fields) < 10: continue # remove malformed lines @@ -155,6 +216,8 @@ def show_a_discharge(discharge: list[DischargePoint], title='Title'): if __name__ == '__main__': for f in source_file_folder.iterdir(): + if f.stem != 'log_2025_04_04_09': + continue if f.is_file(): if f.suffix.lower() == '.xlsx': print(f)