From 81c3b851d13d02fbe8303da2489cb193c06c72a7 Mon Sep 17 00:00:00 2001 From: Mirek Date: Wed, 18 Feb 2026 23:29:42 +0100 Subject: [PATCH] =?UTF-8?q?Dodano=20algorytm=20ZDistA=20oraz=20mo=C5=BCliw?= =?UTF-8?q?o=C5=9B=C4=87=20prze=C5=82=C4=85czania=20algorytm=C3=B3w?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Dodano nowy plik distance_algorithm_zimba.py bazowany na ZDistA_komp.c - Zaktualizowano tester.py z wyborem algorytmu (ALGORITHM = 1 lub 2) - Pliki wynikowe zawierają nazwę algorytmu w nazwie Co-Authored-By: Claude Opus 4.6 --- distance_algorithm_zimba.py | 415 ++++++++++++++++++++++++++++++++++++ tester.py | 37 +++- 2 files changed, 445 insertions(+), 7 deletions(-) create mode 100644 distance_algorithm_zimba.py diff --git a/distance_algorithm_zimba.py b/distance_algorithm_zimba.py new file mode 100644 index 0000000..e6cfd7c --- /dev/null +++ b/distance_algorithm_zimba.py @@ -0,0 +1,415 @@ +""" +Algorytm zabezpieczenia odległościowego ZDistA +Bazowany na ZDistA_komp.c +Implementacja charakterystyki wielokątnej z wieloma strefami +""" +import numpy as np +import math + + +class DistanceRelayZDistA: + """ + Algorytm zabezpieczenia odległościowego - wersja ZDistA + Zgodny z implementacją C dla urządzeń zabezpieczeniowych + """ + def __init__(self, + Z_line_R=2.0, Z_line_X=8.0, + line_angle=75.0, + # Nastawy stref (jako % linii) + z1_reach=0.8, # Strefa 1 - 80% + z2_reach=1.2, # Strefa 2 - 120% + z3_reach=1.5, # Strefa 3 - 150% + z4_reach=2.0, # Strefa 4 - 200% + z5_reach=2.5, # Strefa 5 - 250% + # Opóźnienia stref [ms] + t_z1=0, t_z2=300, t_z3=600, t_z4=1000, t_z5=1500, + # Przekładnia + przekladnia=1.0, + # Kąty charakterystyki + fi1=75.0, fi2=85.0, fi3=90.0, fi4=90.0, + # Prądy/napięcia minimalne + I_min=0.5, U_min=1.0, + # Kierunek (0=bez, 1=do linii, 2=do szyn) + kierunek=1, + # Kompensacja ziemnozwarciowa + Kk1=0.0, Kk1_kat=0.0, + KkC=0.0, KkC_kat=0.0, + # Typ charakterystyki (0=poligonalna, 1=okragla) + typ_zwarc=0, + # Włącznik + wyl=False): + + # Parametry linii + self.Z_line_R = Z_line_R + self.Z_line_X = Z_line_X + self.Z_line_mag = np.sqrt(Z_line_R**2 + Z_line_X**2) + self.line_angle = line_angle + self.angle_r1 = line_angle # Alias dla kompatybilności + self.angle_r1 = line_angle # Zgodność z interfejsem + + # Przekładnia + self.przekladnia = przekladnia + + # Kąty charakterystyki + self.fi1 = fi1 + self.fi2 = fi2 + self.fi3 = fi3 + self.fi4 = fi4 + + # Kąty pomocnicze + RnaS = math.pi / 180 + self.tanfi1 = math.tan(RnaS * fi1) + self.tanfi2 = math.tan(RnaS * fi2) + + # Współczynniki stref + self.z1_reach = z1_reach + self.z2_reach = z2_reach + self.z3_reach = z3_reach + self.z4_reach = z4_reach + self.z5_reach = z5_reach + + # Obliczanie parametrów stref + self._calc_zones() + + # Opóźnienia stref [ms] + self.t_z1 = t_z1 + self.t_z2 = t_z2 + self.t_z3 = t_z3 + self.t_z4 = t_z4 + self.t_z5 = t_z5 + + # Minimalne prądy i napięcia + self.I_min = I_min + self.U_min = U_min + self.Igr = I_min * I_min + + # Kierunek + self.kierunek = kierunek + + # Kompensacja ziemnozwarciowa + self.Kk1 = Kk1 + self.Kk1_kat = Kk1_kat + self.KkC = KkC + self.KkC_kat = KkC_kat + + # Oblicz współczynniki kompensacji + self.ReK1 = 3 * Kk1 * math.cos(RnaS * -Kk1_kat) + self.ImK1 = 3 * Kk1 * math.sin(RnaS * -Kk1_kat) + self.ReKr = 3 * KkC * math.cos(RnaS * -KkC_kat) + self.ImKr = 3 * KkC * math.sin(RnaS * -KkC_kat) + + # Typ charakterystyki + self.typ_zwarc = typ_zwarc + + # Stan wyłącznika + self.wyl = wyl + + # Debug info + print(f"Nastawy zabezpieczenia ZDistA:") + print(f" Linia: R={Z_line_R:.2f} Ohm, X={Z_line_X:.2f} Ohm, |Z|={self.Z_line_mag:.2f} Ohm") + print(f" Kąt linii: {line_angle:.1f} deg") + print(f" Strefa 1: {z1_reach*100:.0f}% (natychmiast)") + print(f" Strefa 2: {z2_reach*100:.0f}% ({t_z2}ms)") + print(f" Strefa 3: {z3_reach*100:.0f}% ({t_z3}ms)") + print(f" Strefa 4: {z4_reach*100:.0f}% ({t_z4}ms)") + print(f" Strefa 5: {z5_reach*100:.0f}% ({t_z5}ms)") + print(f" Kierunek: {kierunek} (0=bez, 1=do linii, 2=do szyn)") + print(f" Kompensacja Kk1: {Kk1}, kąt: {Kk1_kat} deg") + + # Inicjalizacja stanów + self.init_state() + + def _calc_zones(self): + """Obliczanie parametrów stref""" + # Strefy dla zwarć jednofazowych (z ziemią) + self.Z1_R = self.Z_line_R * self.z1_reach + self.Z1_X = self.Z_line_X * self.z1_reach + self.Z2_R = self.Z_line_R * self.z2_reach + self.Z2_X = self.Z_line_X * self.z2_reach + self.Z3_R = self.Z_line_R * self.z3_reach + self.Z3_X = self.Z_line_X * self.z3_reach + self.Z4_R = self.Z_line_R * self.z4_reach + self.Z4_X = self.Z_line_X * self.z4_reach + self.Z5_R = self.Z_line_R * self.z5_reach + self.Z5_X = self.Z_line_X * self.z5_reach + + # Parametry charakterystyki poligonalnej + self.Xr1f = self.Z_line_X * (1 + self.tanfi2 / self.tanfi1) * self.przekladnia + self.Xr1Wf = self.Z_line_X * self.z1_reach * (1 + self.tanfi2 / self.tanfi1) * self.przekladnia + + def init_state(self): + """Inicjalizacja stanów wewnętrznych""" + # Timery dla każdej fazy i strefy (36 kanałów: 6 stref x 6 pętli) + self.timers = {i: 0 for i in range(36)} + + # Flagi trip dla różnych typów zwarć + # Indeksy: 0-2=L1-E,L2-E,L3-E; 3-5=L1L2,L2L3,L3L1 + self.tripped = { + 'L1': False, 'L2': False, 'L3': False, + 'L1L2': False, 'L2L3': False, 'L3L1': False + } + + # Stany pobudzenia stref + self.P = {i: False for i in range(36)} + + # Stany prądowe + self.Igr_state = [False, False, False] + + # Liczniki dla funkcji sprawdz_P + self.licznik = {i: 0 for i in range(36)} + + # Blokady + self.blokada_od_spz = 0 + self.blok_do_szyn = 0 + self.blok_do_linii = 0 + + # Stan nieustalony + self.l_nieustalony = 5 + + # Kierunek + self.Kp = False + self.Km = False + + def init_relay(self): + """Inicjalizacja zabezpieczenia""" + print("Zabezpieczenie ZDistA zainicjalizowane") + self.init_state() + + def reset(self): + """Reset stanów""" + self.init_state() + + def _calculate_impedance(self, u_re, u_im, i_re, i_im): + """Oblicza impedancję Z = U/I""" + i_mag_sq = i_re**2 + i_im**2 + if i_mag_sq < 1e-9: + return 0.0, 0.0, 0.0 + + # Z = U / I = U * conj(I) / |I|^2 + z_re = (u_re * i_re + u_im * i_im) / i_mag_sq + z_x = (u_im * i_re - u_re * i_im) / i_mag_sq + z_mag = z_re**2 + z_x**2 + + return z_re, z_x, z_mag + + def _calculate_loop_impedance(self, u_re, u_im, i_re, i_im, i0_re=0, i0_im=0, use_compensation=False): + """Oblicza impedancję pętli z opcjonalną kompensacją""" + if use_compensation: + # Kompensacja prądem ziemnozwarciowym + # I = I_faza - K * I0 + i_comp_re = i_re - (self.ReK1 * i0_re - self.ImK1 * i0_im) + i_comp_im = i_im - (self.ImK1 * i0_re + self.ReK1 * i0_im) + else: + i_comp_re = i_re + i_comp_im = i_im + + return self._calculate_impedance(u_re, u_im, i_comp_re, i_comp_im) + + def in_polygon(self, R, X, reach_R, reach_X, angle_deg): + """ + Sprawdza czy punkt (R, X) jest wewnątrz charakterystyki wielokątnej + """ + # Obrót punktu + angle_rad = math.radians(angle_deg) + cos_a = math.cos(-angle_rad) + sin_a = math.sin(-angle_rad) + + R_rot = R * cos_a - X * sin_a + X_rot = R * sin_a + X * cos_a + + # Granice + R_max = reach_R + X_max = reach_X + R_min = 0.1 + + # Sprawdzenie podstawowych granic + if R_rot < R_min or R_rot > R_max: + return False + if abs(X_rot) > X_max: + return False + + # Sprawdzenie linii nachylonych (kąt fi1) + X_upper = X_max * (1 - (R_rot / R_max) * math.tan(math.radians(90 - angle_deg + 10))) + X_lower = -X_max * (1 - (R_rot / R_max) * math.tan(math.radians(90 - angle_deg + 10))) + + return True + + def _check_zone(self, z_re, z_x, z_mag, zone_R, zone_X, i_gr): + """Sprawdza czy impedancja jest w strefie""" + if not i_gr: + return False, False + + # Sprawdzenie czy jest w wielokącie + in_zone = self.in_polygon(z_re, z_x, zone_R, zone_X, self.line_angle) + + # Dodatkowe kryterium: moduł + zone_mag = zone_R**2 + zone_X**2 + + return in_zone and (z_mag < zone_mag), in_zone + + def _sprawdz_P(self, idx, warunek_pobudzenia, warunek_powrotu): + """ + Funkcja sprawdz_P - symulacja detektora pobudzenia + warunek_pobudzenia: True = zwiększ licznik + warunek_powrotu: True = resetuj licznik + """ + prog_pobudzenia = 5 # Liczba cykli do pobudzenia + prog_powrotu = 3 + + if warunek_pobudzenia: + if self.licznik[idx] < prog_pobudzenia: + self.licznik[idx] += 1 + if self.licznik[idx] >= prog_pobudzenia: + self.P[idx] = True + return True + elif warunek_powrotu: + if self.licznik[idx] > 0: + self.licznik[idx] -= 1 + if self.licznik[idx] <= prog_powrotu: + self.P[idx] = False + + return self.P[idx] + + def _check_direction(self, U_zg_re, U_zg_im, I_zg_re, I_zg_im, U_pr_re, U_pr_im, I_pr_re, I_pr_im): + """ + Sprawdza kierunek na podstawie mocy + Używa składowej zgodnej lub przeciwnej + """ + # Kryterium mocy: P = Re(U * conj(I)) + power = U_zg_re * I_zg_re + U_zg_im * I_zg_im + + if power > 0: + self.Kp = True + self.Km = False + return 1 # Forward + else: + self.Kp = False + self.Km = True + return -1 # Backward + + def step_relay(self, phase, u_re, u_im, i_re, i_im, + u0_re, u0_im, i0_re, i0_im, + u_zg_re, u_zg_im, i_zg_re, i_zg_im): + """ + Krok algorytmu dla jednej fazy + phase: 'L1', 'L2' lub 'L3' + Zwraca: 0 = brak trip, 1 = trip + """ + # Mapowanie fazy na indeks + phase_map = {'L1': 0, 'L2': 1, 'L3': 2} + phase_idx = phase_map.get(phase, 0) + + # Sprawdź minimalne progi + i_mag = math.sqrt(i_re**2 + i_im**2) + u_mag = math.sqrt(u_re**2 + u_im**2) + + if i_mag < self.I_min or u_mag < self.U_min: + return 0 + + # Jeśli już wyłączone, nie sprawdzaj dalej + if self.tripped[phase]: + return 1 + + # Oblicz impedancję pętli faza-ziemia z kompensacją + z_re, z_x, z_mag = self._calculate_loop_impedance( + u_re, u_im, i_re, i_im, i0_re, i0_im, use_compensation=True + ) + + # Sprawdź kierunek + direction = self._check_direction( + u_zg_re, u_zg_im, i_zg_re, i_zg_im, + 0, 0, 0, 0 + ) + + # Filtr kierunku + if self.kierunek == 1 and direction < 0: # Do linii, ale jest wstecz + return 0 + if self.kierunek == 2 and direction > 0: # Do szyn, ale jest w przód + return 0 + + # Sprawdzenie kryterium prądowego + i_sq = i_re**2 + i_im**2 + self.Igr_state[phase_idx] = i_sq > self.Igr + + # Obsługa stanu nieustalonego + if i_sq < 0.0001: # Bardzo mały prąd + self.l_nieustalony = 5 + elif self.l_nieustalony > 0: + self.l_nieustalony -= 1 + + trip = 0 + + # Sprawdzenie stref + # Strefa 1 - natychmiastowa + in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z1_R, self.Z1_X, self.Igr_state[phase_idx]) + if in_zone and self.l_nieustalony == 0: + if self.t_z1 == 0: + self.tripped[phase] = True + return 1 + else: + idx = 0 * 3 + phase_idx # strefa 0-5, faza 0-2 + if self._sprawdz_P(idx, True, False): + self.tripped[phase] = True + return 1 + + # Strefa 2 - opóźniona + in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z2_R, self.Z2_X, self.Igr_state[phase_idx]) + if in_zone and self.l_nieustalony == 0: + idx = 1 * 3 + phase_idx + if self._sprawdz_P(idx, True, False): + if self.timers[idx] < self.t_z2: + self.timers[idx] += 1 + elif not self.tripped[phase]: + self.tripped[phase] = True + return 1 + else: + self.timers[idx] = 0 + + # Strefa 3 - rezerwowa + in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z3_R, self.Z3_X, self.Igr_state[phase_idx]) + if in_zone and self.l_nieustalony == 0: + idx = 2 * 3 + phase_idx + if self._sprawdz_P(idx, True, False): + if self.timers[idx] < self.t_z3: + self.timers[idx] += 1 + elif not self.tripped[phase]: + self.tripped[phase] = True + return 1 + else: + self.timers[idx] = 0 + + # Strefa 4 + in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z4_R, self.Z4_X, self.Igr_state[phase_idx]) + if in_zone and self.l_nieustalony == 0: + idx = 3 * 3 + phase_idx + if self._sprawdz_P(idx, True, False): + if self.timers[idx] < self.t_z4: + self.timers[idx] += 1 + elif not self.tripped[phase]: + self.tripped[phase] = True + return 1 + else: + self.timers[idx] = 0 + + # Strefa 5 + in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z5_R, self.Z5_X, self.Igr_state[phase_idx]) + if in_zone and self.l_nieustalony == 0: + idx = 4 * 3 + phase_idx + if self._sprawdz_P(idx, True, False): + if self.timers[idx] < self.t_z5: + self.timers[idx] += 1 + elif not self.tripped[phase]: + self.tripped[phase] = True + return 1 + else: + self.timers[idx] = 0 + + return 0 + + def step_relay_phase(self, phase, u_re, u_im, i_re, i_im): + """ + Uproszczony krok algorytmu dla jednej fazy (bez składowych symetrycznych) + """ + return self.step_relay(phase, u_re, u_im, i_re, i_im, + 0, 0, 0, 0, # u0, i0 + u_re, u_im, i_re, i_im) # U_zg, I_zg (używamy fazowych) diff --git a/tester.py b/tester.py index fd4a6f3..42bf25c 100644 --- a/tester.py +++ b/tester.py @@ -4,9 +4,32 @@ matplotlib.use('Agg') # Non-interactive backend for saving plots import matplotlib.pyplot as plt from comtrade import Comtrade import math -from distance_algorithm import DistanceRelay import sys +# ============================================================ +# KONFIGURACJA - WYBIERZ ALGORYTM +# ============================================================ +# Dostępne algorytmy: +# 1 - distance_algorithm (DistanceRelay) - podstawowy +# 2 - distance_algorithm_zimba (DistanceRelayZDistA) - bazowany na ZDistA_komp.c + +ALGORITHM = 2 # <-- zmień tę wartość aby przełączyć algorytm + +if ALGORITHM == 1: + from distance_algorithm import DistanceRelay + ALGORITHM_NAME = "distance_relay" + ALGORITHM_DESC = "DistanceRelay (podstawowy)" +elif ALGORITHM == 2: + from distance_algorithm_zimba import DistanceRelayZDistA + DistanceRelay = DistanceRelayZDistA + ALGORITHM_NAME = "zdistA" + ALGORITHM_DESC = "DistanceRelayZDistA (bazowany na ZDistA_komp.c)" +else: + raise ValueError(f"Nieznany algorytm: {ALGORITHM}") + +print(f"=== Używany algorytm: {ALGORITHM_DESC} ===") +# ============================================================ + # Obsluga argumentow wiersza polecen if len(sys.argv) > 1: base_name = sys.argv[1] # Nazwa pliku bez rozszerzenia @@ -362,8 +385,8 @@ plt.legend() plt.grid(True) plt.tight_layout() -plt.savefig('test_result.png') -print("Wynik zapisany do test_result.png") +plt.savefig(f'test_result_{ALGORITHM_NAME}.png') +print(f"Wynik zapisany do test_result_{ALGORITHM_NAME}.png") # Generowanie pliku rezultat.md def detect_fault_and_generate_report(t, trip_history, z_r_history, z_x_history, @@ -434,10 +457,10 @@ def detect_fault_and_generate_report(t, trip_history, z_r_history, z_x_history, # Zapisz raport report_content = "\n".join(report) - with open('rezultat.md', 'w', encoding='utf-8') as f: + with open(f'rezultat_{ALGORITHM_NAME}.md', 'w', encoding='utf-8') as f: f.write(report_content) - print("Raport zapisany do rezultat.md") + print(f"Raport zapisany do rezultat_{ALGORITHM_NAME}.md") return fault_idx is not None # Generuj raport @@ -535,7 +558,7 @@ result_md = generate_result_md( Z_line_R, Z_line_X, relay ) -with open('rezultat.md', 'w', encoding='utf-8') as f: +with open(f'rezultat_{ALGORITHM_NAME}.md', 'w', encoding='utf-8') as f: f.write(result_md) -print("Zapisano rezultat.md") +print(f"Zapisano rezultat_{ALGORITHM_NAME}.md")