import numpy as np import matplotlib matplotlib.use('Agg') # Non-interactive backend for saving plots import matplotlib.pyplot as plt from comtrade import Comtrade import math import sys import os # ============================================================ # KONFIGURACJA - WYBIERZ ALGORYTM # ============================================================ # Dostępne algorytmy: # 1 - distance_algorithm (DistanceRelay) - podstawowy # 2 - distance_algorithm_zimba (DistanceRelayZDistA) - bazowany na ZDistA_komp.c ALGORITHM = 2 # <-- zmień tę wartość aby przełączyć algorytm if ALGORITHM == 1: from distance_algorithm import DistanceRelay ALGORITHM_NAME = "distance_relay" ALGORITHM_DESC = "DistanceRelay (podstawowy)" elif ALGORITHM == 2: from distance_algorithm_zimba import DistanceRelayZDistA DistanceRelay = DistanceRelayZDistA ALGORITHM_NAME = "zdistA" ALGORITHM_DESC = "DistanceRelayZDistA (bazowany na ZDistA_komp.c)" else: raise ValueError(f"Nieznany algorytm: {ALGORITHM}") print(f"=== Używany algorytm: {ALGORITHM_DESC} ===") # ============================================================ def parse_settings(directory): """ Parsuje pliki Nastawy.txt lub nastawy stref.txt w podanym katalogu. """ settings = {} for filename in ["Nastawy.txt", "nastawy stref.txt"]: filepath = os.path.join(directory, filename) if os.path.exists(filepath): print(f"Znaleziono plik nastaw: {filepath}") try: with open(filepath, 'r', encoding='utf-8') as f: content = f.readlines() except UnicodeDecodeError: with open(filepath, 'r', encoding='cp1250') as f: content = f.readlines() for line in content: line = line.strip() if not line or line.startswith('#'): continue separator = None if '=' in line: separator = '=' elif ':' in line: separator = ':' if separator: key, value = line.split(separator, 1) key = key.strip() value = value.strip() try: settings[key] = float(value) except ValueError: settings[key] = value break if settings: print("Wczytano następujące nastawy:") for key, value in settings.items(): print(f" {key}: {value}") return settings # Przekładnia prądowa (domyślna) PRZEKLADNIA = 200.0 PRZEKLADNIA_NAPIECIA = 1100.0 KIERUNEK = 0 # 0=bez, 1=do linii, 2=do szyn # Obsluga argumentow wiersza polecen if len(sys.argv) > 1: base_name = sys.argv[1] else: base_name = "zwarcie_testowe" # Wczytywanie nastaw z pliku directory = os.path.dirname(os.path.abspath(base_name)) settings = parse_settings(directory) # Nadpisz domyślne wartości, jeśli istnieją w pliku nastaw PRZEKLADNIA = float(settings.get('Przekladnia pradowa', PRZEKLADNIA)) PRZEKLADNIA_NAPIECIA = float(settings.get('Przekladnia napieciowa', PRZEKLADNIA_NAPIECIA)) KIERUNEK = int(settings.get('Kierunek', KIERUNEK)) print(f"\nUżywane parametry globalne:") print(f" Przekładnia prądowa: {PRZEKLADNIA}") print(f" Przekładnia napięciowa: {PRZEKLADNIA_NAPIECIA}") print(f" Kierunek: {KIERUNEK}\n") PRZEKLADNIA_EFF = PRZEKLADNIA_NAPIECIA / PRZEKLADNIA cfg_file = f"{base_name}.cfg" dat_file = f"{base_name}.dat" print(f"Wczytywanie rejestracji: {cfg_file}") rec = Comtrade() # Probuj rozne kodowania dla plikow CFG encodings = ['utf-8', 'cp1250', 'cp1252', 'latin-1', 'iso-8859-1', 'cp1251', 'cp1253'] loaded = False for encoding in encodings: try: rec.load(cfg_file, dat_file, encoding=encoding) loaded = True break except (UnicodeDecodeError, Exception) as e: continue if not loaded: print(f"BŁĄD: Nie można wczytać pliku {cfg_file}") sys.exit(1) # Sprawdz czy mamy wystarczajaco kanalow analogowych num_analog = len(rec.analog) print(f"Liczba kanalow analogowych: {num_analog}") if num_analog < 6: print(f"BŁĄD: Za mało kanałów analogowych (wymagane minimum 6)") sys.exit(1) # Wyciagniecie danych t = np.array(rec.time) Fs = rec.cfg.sample_rates[0][0] f_nom = 50.0 N = int(Fs / f_nom) if f_nom > 0 else 20 if N < 1: N = 1 print(f" Czestotliwosc probkowania: {Fs} Hz") print(f" Liczba probek na okres: {N}") i1_raw = np.array(rec.analog[0]) i2_raw = np.array(rec.analog[1]) i3_raw = np.array(rec.analog[2]) if num_analog >= 7: u1_raw = np.array(rec.analog[4]) u2_raw = np.array(rec.analog[5]) u3_raw = np.array(rec.analog[6]) else: u1_raw = np.array(rec.analog[3]) u2_raw = np.array(rec.analog[4]) u3_raw = np.array(rec.analog[5]) # === ANALIZA DANYCH - wyznaczenie impedancji przed zwarciem === def fcdft(samples): if len(samples) < N: return 0.0, 0.0 k = np.arange(N) cos_wave = np.cos(2 * np.pi * k / N) sin_wave = np.sin(2 * np.pi * k / N) re = (2.0 / N) * np.sum(samples * cos_wave) im = -(2.0 / N) * np.sum(samples * sin_wave) return re, im def calculate_impedance_from_raw(u_raw, i_raw, idx): if idx < N: return 0, 0 window_i = i_raw[idx-N:idx] window_u = u_raw[idx-N:idx] i_re, i_im = fcdft(window_i) u_re, u_im = fcdft(window_u) i_re /= PRZEKLADNIA i_im /= PRZEKLADNIA u_re /= PRZEKLADNIA_NAPIECIA u_im /= PRZEKLADNIA_NAPIECIA i_mag_sq = i_re**2 + i_im**2 if i_mag_sq < 1e-9: return 0, 0 z_re = (u_re * i_re + u_im * i_im) / i_mag_sq z_x = (u_im * i_re - u_re * i_im) / i_mag_sq return z_re, z_x pre_fault_start = 10 pre_fault_end = min(100, len(t) - N) z_r_list, z_x_list = [], [] step = max(N, 1) for idx in range(pre_fault_start, pre_fault_end, step): z_r, z_x = calculate_impedance_from_raw(u1_raw, i1_raw, idx) if z_r > 0: z_r_list.append(z_r) z_x_list.append(z_x) if z_r_list: Z_line_R = np.median(z_r_list) Z_line_X = np.median(z_x_list) Z_line_mag = np.sqrt(Z_line_R**2 + Z_line_X**2) line_angle = np.degrees(np.arctan2(Z_line_X, Z_line_R)) print(f"\nImpedancja linii (przed zwarciem): R={Z_line_R:.2f}, X={Z_line_X:.2f}, |Z|={Z_line_mag:.2f} Ohm, Kat={line_angle:.1f} deg") else: Z_line_R = 2.0 Z_line_X = 8.0 Z_line_mag = np.sqrt(Z_line_R**2 + Z_line_X**2) line_angle = 75.0 print("Nie mozna wyznaczyc impedancji linii, uzywam wartosci domyslnych") # Utworzenie relay z nastawami z pliku relay = DistanceRelay(Z_line_R=Z_line_R, Z_line_X=Z_line_X, line_angle=line_angle, kierunek=KIERUNEK, settings=settings) # ... (reszta skryptu bez zmian) ... # Macierz operatora obrotu dla składowych symetrycznych a = complex(-0.5, np.sqrt(3)/2) a2 = complex(-0.5, -np.sqrt(3)/2) # Symulacja "czasu rzeczywistego" próbka po próbce relay.reset() relay.init_relay() trip_history_L1 = [] trip_history_L2 = [] trip_history_L3 = [] # Impedance phasors (R + jX) z1_r_history = [] z1_x_history = [] z2_r_history = [] z2_x_history = [] z3_r_history = [] z3_x_history = [] def calculate_impedance(u_cpx, i_cpx): """Oblicza impedancję Z = U/I jako liczbę zespoloną""" # Konwersja do wartości wtórnych i_cpx = complex(i_cpx.real / PRZEKLADNIA, i_cpx.imag / PRZEKLADNIA) u_cpx = complex(u_cpx.real / PRZEKLADNIA_NAPIECIA, u_cpx.imag / PRZEKLADNIA_NAPIECIA) i_mag_sq = i_cpx.real**2 + i_cpx.imag**2 if i_mag_sq < 1e-9: # Zabezpieczenie przed dzieleniem przez zero return 0.0, 0.0 # Z = U / I = U * conj(I) / |I|^2 z_cpx = u_cpx * complex(i_cpx.real, -i_cpx.imag) / i_mag_sq return z_cpx.real, z_cpx.imag # Symulacja "czasu rzeczywistego" próbka po próbce for i in range(N, len(t)): # Pobranie okna danych (historyczne N próbek aż do obecnej chwili i) window_i1 = i1_raw[i-N:i] window_i2 = i2_raw[i-N:i] window_i3 = i3_raw[i-N:i] window_u1 = u1_raw[i-N:i] window_u2 = u2_raw[i-N:i] window_u3 = u3_raw[i-N:i] # Filtracja DFT dla wszystkich faz i1_re, i1_im = fcdft(window_i1) i2_re, i2_im = fcdft(window_i2) i3_re, i3_im = fcdft(window_i3) u1_re, u1_im = fcdft(window_u1) u2_re, u2_im = fcdft(window_u2) u3_re, u3_im = fcdft(window_u3) # Tworzenie liczb zespolonych I1_cpx = complex(i1_re, i1_im) I2_cpx = complex(i2_re, i2_im) I3_cpx = complex(i3_re, i3_im) U1_cpx = complex(u1_re, u1_im) U2_cpx = complex(u2_re, u2_im) U3_cpx = complex(u3_re, u3_im) # Obliczanie składowych symetrycznych I0_cpx = (I1_cpx + I2_cpx + I3_cpx) / 3.0 I1zg_cpx = (I1_cpx + a * I2_cpx + a2 * I3_cpx) / 3.0 U0_cpx = (U1_cpx + U2_cpx + U3_cpx) / 3.0 U1zg_cpx = (U1_cpx + a * U2_cpx + a2 * U3_cpx) / 3.0 # Przekazanie danych do algorytmu zabezpieczeniowego trip_l1 = relay.step_relay('L1', U1_cpx.real, U1_cpx.imag, I1_cpx.real, I1_cpx.imag, I0_cpx.real, I0_cpx.imag, U0_cpx.real, U0_cpx.imag, U1zg_cpx.real, U1zg_cpx.imag, I1zg_cpx.real, I1zg_cpx.imag) trip_l2 = relay.step_relay('L2', U2_cpx.real, U2_cpx.imag, I2_cpx.real, I2_cpx.imag, I0_cpx.real, I0_cpx.imag, U0_cpx.real, U0_cpx.imag, U1zg_cpx.real, U1zg_cpx.imag, I1zg_cpx.real, I1zg_cpx.imag) trip_l3 = relay.step_relay('L3', U3_cpx.real, U3_cpx.imag, I3_cpx.real, I3_cpx.imag, I0_cpx.real, I0_cpx.imag, U0_cpx.real, U0_cpx.imag, U1zg_cpx.real, U1zg_cpx.imag, I1zg_cpx.real, I1zg_cpx.imag) trip_history_L1.append(trip_l1) trip_history_L2.append(trip_l2) trip_history_L3.append(trip_l3) z1_r, z1_x = calculate_impedance(U1_cpx, I1_cpx) z2_r, z2_x = calculate_impedance(U2_cpx, I2_cpx) z3_r, z3_x = calculate_impedance(U3_cpx, I3_cpx) z1_r_history.append(z1_r) z1_x_history.append(z1_x) z2_r_history.append(z2_r) z2_x_history.append(z2_x) z3_r_history.append(z3_r) z3_x_history.append(z3_x) # 5. Rysowanie wyników plt.figure(figsize=(16, 12)) # ... (reszta kodu rysującego i generującego raporty bez zmian, ponieważ bazuje na obiekcie `relay`, który jest już poprawnie skonfigurowany) # Prądy faz L1, L2, L3 plt.subplot(3, 2, 1) plt.plot(t[N:], i1_raw[N:], label='I_L1', color='blue') plt.plot(t[N:], i2_raw[N:], label='I_L2', color='green') plt.plot(t[N:], i3_raw[N:], label='I_L3', color='orange') plt.title('Prady faz L1, L2, L3') plt.xlabel('Czas [s]') plt.ylabel('Prad [A]') plt.legend() plt.grid(True) # Charakterystyka R-X plt.subplot(3, 2, 2) angle_rad = math.radians(relay.angle_r1) # Strefa 1 if hasattr(relay, 'Z1_R'): plt.fill_between(np.linspace(0, relay.Z1_R, 2), -relay.Z1_X, relay.Z1_X, alpha=0.2, color='green', label='Strefa 1') # Strefa 2 if hasattr(relay, 'Z2_R'): plt.fill_between(np.linspace(0, relay.Z2_R, 2), -relay.Z2_X, relay.Z2_X, alpha=0.15, color='yellow', label='Strefa 2') # Strefa 3 if hasattr(relay, 'Z3_R'): plt.fill_between(np.linspace(0, relay.Z3_R, 2), -relay.Z3_X, relay.Z3_X, alpha=0.1, color='red', label='Strefa 3') z_line = np.linspace(0, relay.Z_line_mag * 1.5, 50) x_line = z_line * math.tan(angle_rad) plt.plot(z_line, x_line, 'k--', linewidth=1, label='Linia Z') plt.plot(z1_r_history, z1_x_history, 'b.', markersize=1, alpha=0.5, label='Trajektoria L1') plt.plot(z2_r_history, z2_x_history, 'g.', markersize=1, alpha=0.5, label='Trajektoria L2') plt.plot(z3_r_history, z3_x_history, 'o', color='orange', markersize=1, alpha=0.5, label='Trajektoria L3') xlim_max = (hasattr(relay, 'Z3_R') and relay.Z3_R or (hasattr(relay, 'Z5_R') and relay.Z5_R or 10)) ylim_max = (hasattr(relay, 'Z3_X') and relay.Z3_X or (hasattr(relay, 'Z5_X') and relay.Z5_X or 20)) plt.xlim(-1, xlim_max * 1.2) plt.ylim(-ylim_max, ylim_max * 1.2) plt.xlabel('R [Ohm]') plt.ylabel('X [Ohm]') plt.title('Charakterystyka R-X zabezpieczenia odleglosciowego') plt.legend(loc='upper right', fontsize=8) plt.grid(True) plt.axhline(y=0, color='k', linewidth=0.5) plt.axvline(x=0, color='k', linewidth=0.5) # Napięcia faz L1, L2, L3 plt.subplot(3, 2, 3) plt.plot(t[N:], u1_raw[N:], label='U_L1', color='blue') plt.plot(t[N:], u2_raw[N:], label='U_L2', color='green') plt.plot(t[N:], u3_raw[N:], label='U_L3', color='orange') plt.title('Napięcia faz L1, L2, L3') plt.xlabel('Czas [s]') plt.ylabel('Napięcie [V]') plt.legend() plt.grid(True) # Impedancje faz L1, L2, L3 plt.subplot(3, 2, 4) plt.plot(t[N:], z1_r_history, label='R_L1', color='blue', linestyle='-') plt.plot(t[N:], z1_x_history, label='X_L1', color='blue', linestyle='--') plt.plot(t[N:], z2_r_history, label='R_L2', color='green', linestyle='-') plt.plot(t[N:], z2_x_history, label='X_L2', color='green', linestyle='--') plt.plot(t[N:], z3_r_history, label='R_L3', color='orange', linestyle='-') plt.plot(t[N:], z3_x_history, label='X_L3', color='orange', linestyle='--') plt.title('Impedancje faz (R + jX)') plt.xlabel('Czas [s]') plt.ylabel('Impedancja [Ohm]') plt.legend() plt.grid(True) # Wyjscia zabezpieczenia plt.subplot(3, 2, 5) max_val = max(max(i1_raw), max(i2_raw), max(i3_raw)) if any(i1_raw) else 1 plt.plot(t[N:], i1_raw[N:], label='I_L1', color='blue', alpha=0.5) plt.plot(t[N:], i2_raw[N:], label='I_L2', color='green', alpha=0.5) plt.plot(t[N:], i3_raw[N:], label='I_L3', color='orange', alpha=0.5) plt.plot(t[N:], np.array(trip_history_L1) * max_val, label='Trip L1-E', color='red', linewidth=2) plt.plot(t[N:], np.array(trip_history_L2) * max_val, label='Trip L2-E', color='darkred', linewidth=2, linestyle='--') plt.plot(t[N:], np.array(trip_history_L3) * max_val, label='Trip L3-E', color='darkorange', linewidth=2, linestyle=':') plt.title(f'Wynik testu algorytmu {ALGORITHM_NAME}') plt.xlabel('Czas [s]') plt.ylabel('Wartosc') plt.legend() plt.grid(True) # Modul impedancji plt.subplot(3, 2, 6) z1_mag = np.sqrt(np.array(z1_r_history)**2 + np.array(z1_x_history)**2) z2_mag = np.sqrt(np.array(z2_r_history)**2 + np.array(z2_x_history)**2) z3_mag = np.sqrt(np.array(z3_r_history)**2 + np.array(z3_x_history)**2) plt.plot(t[N:], z1_mag, label='|Z_L1|', color='blue') plt.plot(t[N:], z2_mag, label='|Z_L2|', color='green') plt.plot(t[N:], z3_mag, label='|Z_L3|', color='orange') if hasattr(relay, 'Z1_R'): z1_reach = np.sqrt(relay.Z1_R**2 + relay.Z1_X**2) plt.axhline(y=z1_reach, color='green', linestyle='--', label='Z1 reach') if hasattr(relay, 'Z2_R'): z2_reach = np.sqrt(relay.Z2_R**2 + relay.Z2_X**2) plt.axhline(y=z2_reach, color='orange', linestyle='--', label='Z2 reach') if hasattr(relay, 'Z3_R'): z3_reach = np.sqrt(relay.Z3_R**2 + relay.Z3_X**2) plt.axhline(y=z3_reach, color='red', linestyle='--', label='Z3 reach') plt.title('Modul impedancji |Z|') plt.xlabel('Czas [s]') plt.ylabel('|Z| [Ohm]') plt.legend() plt.grid(True) plt.tight_layout() output_filename = f"{os.path.splitext(base_name)[0]}_result.png" plt.savefig(output_filename) print(f"Wynik zapisany do {output_filename}") # Generowanie pliku rezultat.md def generate_result_md(t, trip_history_L1, trip_history_L2, trip_history_L3, z1_r_history, z1_x_history, relay, base_name): """Generuje plik rezultat.md z informacjami o wykryciu zwarcia""" md_content = [f"# Wynik analizy dla {os.path.basename(base_name)}", ""] md_content.append(f"## Parametry zabezpieczenia ({ALGORITHM_DESC})") md_content.append(f"- Impedancja linii: R={relay.Z_line_R:.2f} Ohm, X={relay.Z_line_X:.2f} Ohm") md_content.append(f"- Kat linii: {relay.line_angle:.1f} st.") if hasattr(relay, 'z1_reach'): md_content.append(f"- Strefa 1: {relay.z1_reach*100:.0f}% ({relay.t_z1}ms)") md_content.append(f"- Strefa 2: {relay.z2_reach*100:.0f}% ({relay.t_z2}ms)") md_content.append(f"- Strefa 3: {relay.z3_reach*100:.0f}% ({relay.t_z3}ms)") else: md_content.append(f"- Strefa 1: R={relay.Z1_R:.2f} Ohm, X={relay.Z1_X:.2f} Ohm ({relay.tZ1}ms)") md_content.append(f"- Strefa 2: R={relay.Z2_R:.2f} Ohm, X={relay.Z2_X:.2f} Ohm ({relay.tZ2}ms)") md_content.append(f"- Strefa 3: R={relay.Z3_R:.2f} Ohm, X={relay.Z3_X:.2f} Ohm ({relay.tZ3}ms)") md_content.append("") trip_any = any(trip_history_L1) or any(trip_history_L2) or any(trip_history_L3) if trip_any: md_content.append("## Wykrycie zwarcia: TAK") fault_time, fault_phase, fault_idx = -1, "Brak", -1 for i, (t1, t2, t3) in enumerate(zip(trip_history_L1, trip_history_L2, trip_history_L3)): if t1 or t2 or t3: fault_time = t[N + i] fault_idx = i if t1: fault_phase = "L1" elif t2: fault_phase = "L2" else: fault_phase = "L3" break md_content.append(f"- Czas wykrycia: {fault_time * 1000:.2f} ms") md_content.append(f"- Faza: {fault_phase}") z_r, z_x = z1_r_history[fault_idx], z1_x_history[fault_idx] z_mag = math.sqrt(z_r**2 + z_x**2) z_angle = math.degrees(math.atan2(z_x, z_r)) md_content.append("## Wartości w momencie zwarcia") md_content.append(f"- R = {z_r:.4f} Ohm, X = {z_x:.4f} Ohm") md_content.append(f"- |Z| = {z_mag:.4f} Ohm, Kat Z = {z_angle:.2f} st.") else: md_content.append("## Wykrycie zwarcia: NIE") report_filename = f"{os.path.splitext(base_name)[0]}_rezultat.md" with open(report_filename, 'w', encoding='utf-8') as f: f.write("\n".join(md_content)) print(f"Raport zapisany do {report_filename}") generate_result_md(t, trip_history_L1, trip_history_L2, trip_history_L3, z1_r_history, z1_x_history, relay, base_name) print("Analiza zakończona.")