Dodano algorytm ZDistA oraz możliwość przełączania algorytmów
- Dodano nowy plik distance_algorithm_zimba.py bazowany na ZDistA_komp.c - Zaktualizowano tester.py z wyborem algorytmu (ALGORITHM = 1 lub 2) - Pliki wynikowe zawierają nazwę algorytmu w nazwie Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
415
distance_algorithm_zimba.py
Normal file
415
distance_algorithm_zimba.py
Normal file
@@ -0,0 +1,415 @@
|
||||
"""
|
||||
Algorytm zabezpieczenia odległościowego ZDistA
|
||||
Bazowany na ZDistA_komp.c
|
||||
Implementacja charakterystyki wielokątnej z wieloma strefami
|
||||
"""
|
||||
import numpy as np
|
||||
import math
|
||||
|
||||
|
||||
class DistanceRelayZDistA:
|
||||
"""
|
||||
Algorytm zabezpieczenia odległościowego - wersja ZDistA
|
||||
Zgodny z implementacją C dla urządzeń zabezpieczeniowych
|
||||
"""
|
||||
def __init__(self,
|
||||
Z_line_R=2.0, Z_line_X=8.0,
|
||||
line_angle=75.0,
|
||||
# Nastawy stref (jako % linii)
|
||||
z1_reach=0.8, # Strefa 1 - 80%
|
||||
z2_reach=1.2, # Strefa 2 - 120%
|
||||
z3_reach=1.5, # Strefa 3 - 150%
|
||||
z4_reach=2.0, # Strefa 4 - 200%
|
||||
z5_reach=2.5, # Strefa 5 - 250%
|
||||
# Opóźnienia stref [ms]
|
||||
t_z1=0, t_z2=300, t_z3=600, t_z4=1000, t_z5=1500,
|
||||
# Przekładnia
|
||||
przekladnia=1.0,
|
||||
# Kąty charakterystyki
|
||||
fi1=75.0, fi2=85.0, fi3=90.0, fi4=90.0,
|
||||
# Prądy/napięcia minimalne
|
||||
I_min=0.5, U_min=1.0,
|
||||
# Kierunek (0=bez, 1=do linii, 2=do szyn)
|
||||
kierunek=1,
|
||||
# Kompensacja ziemnozwarciowa
|
||||
Kk1=0.0, Kk1_kat=0.0,
|
||||
KkC=0.0, KkC_kat=0.0,
|
||||
# Typ charakterystyki (0=poligonalna, 1=okragla)
|
||||
typ_zwarc=0,
|
||||
# Włącznik
|
||||
wyl=False):
|
||||
|
||||
# Parametry linii
|
||||
self.Z_line_R = Z_line_R
|
||||
self.Z_line_X = Z_line_X
|
||||
self.Z_line_mag = np.sqrt(Z_line_R**2 + Z_line_X**2)
|
||||
self.line_angle = line_angle
|
||||
self.angle_r1 = line_angle # Alias dla kompatybilności
|
||||
self.angle_r1 = line_angle # Zgodność z interfejsem
|
||||
|
||||
# Przekładnia
|
||||
self.przekladnia = przekladnia
|
||||
|
||||
# Kąty charakterystyki
|
||||
self.fi1 = fi1
|
||||
self.fi2 = fi2
|
||||
self.fi3 = fi3
|
||||
self.fi4 = fi4
|
||||
|
||||
# Kąty pomocnicze
|
||||
RnaS = math.pi / 180
|
||||
self.tanfi1 = math.tan(RnaS * fi1)
|
||||
self.tanfi2 = math.tan(RnaS * fi2)
|
||||
|
||||
# Współczynniki stref
|
||||
self.z1_reach = z1_reach
|
||||
self.z2_reach = z2_reach
|
||||
self.z3_reach = z3_reach
|
||||
self.z4_reach = z4_reach
|
||||
self.z5_reach = z5_reach
|
||||
|
||||
# Obliczanie parametrów stref
|
||||
self._calc_zones()
|
||||
|
||||
# Opóźnienia stref [ms]
|
||||
self.t_z1 = t_z1
|
||||
self.t_z2 = t_z2
|
||||
self.t_z3 = t_z3
|
||||
self.t_z4 = t_z4
|
||||
self.t_z5 = t_z5
|
||||
|
||||
# Minimalne prądy i napięcia
|
||||
self.I_min = I_min
|
||||
self.U_min = U_min
|
||||
self.Igr = I_min * I_min
|
||||
|
||||
# Kierunek
|
||||
self.kierunek = kierunek
|
||||
|
||||
# Kompensacja ziemnozwarciowa
|
||||
self.Kk1 = Kk1
|
||||
self.Kk1_kat = Kk1_kat
|
||||
self.KkC = KkC
|
||||
self.KkC_kat = KkC_kat
|
||||
|
||||
# Oblicz współczynniki kompensacji
|
||||
self.ReK1 = 3 * Kk1 * math.cos(RnaS * -Kk1_kat)
|
||||
self.ImK1 = 3 * Kk1 * math.sin(RnaS * -Kk1_kat)
|
||||
self.ReKr = 3 * KkC * math.cos(RnaS * -KkC_kat)
|
||||
self.ImKr = 3 * KkC * math.sin(RnaS * -KkC_kat)
|
||||
|
||||
# Typ charakterystyki
|
||||
self.typ_zwarc = typ_zwarc
|
||||
|
||||
# Stan wyłącznika
|
||||
self.wyl = wyl
|
||||
|
||||
# Debug info
|
||||
print(f"Nastawy zabezpieczenia ZDistA:")
|
||||
print(f" Linia: R={Z_line_R:.2f} Ohm, X={Z_line_X:.2f} Ohm, |Z|={self.Z_line_mag:.2f} Ohm")
|
||||
print(f" Kąt linii: {line_angle:.1f} deg")
|
||||
print(f" Strefa 1: {z1_reach*100:.0f}% (natychmiast)")
|
||||
print(f" Strefa 2: {z2_reach*100:.0f}% ({t_z2}ms)")
|
||||
print(f" Strefa 3: {z3_reach*100:.0f}% ({t_z3}ms)")
|
||||
print(f" Strefa 4: {z4_reach*100:.0f}% ({t_z4}ms)")
|
||||
print(f" Strefa 5: {z5_reach*100:.0f}% ({t_z5}ms)")
|
||||
print(f" Kierunek: {kierunek} (0=bez, 1=do linii, 2=do szyn)")
|
||||
print(f" Kompensacja Kk1: {Kk1}, kąt: {Kk1_kat} deg")
|
||||
|
||||
# Inicjalizacja stanów
|
||||
self.init_state()
|
||||
|
||||
def _calc_zones(self):
|
||||
"""Obliczanie parametrów stref"""
|
||||
# Strefy dla zwarć jednofazowych (z ziemią)
|
||||
self.Z1_R = self.Z_line_R * self.z1_reach
|
||||
self.Z1_X = self.Z_line_X * self.z1_reach
|
||||
self.Z2_R = self.Z_line_R * self.z2_reach
|
||||
self.Z2_X = self.Z_line_X * self.z2_reach
|
||||
self.Z3_R = self.Z_line_R * self.z3_reach
|
||||
self.Z3_X = self.Z_line_X * self.z3_reach
|
||||
self.Z4_R = self.Z_line_R * self.z4_reach
|
||||
self.Z4_X = self.Z_line_X * self.z4_reach
|
||||
self.Z5_R = self.Z_line_R * self.z5_reach
|
||||
self.Z5_X = self.Z_line_X * self.z5_reach
|
||||
|
||||
# Parametry charakterystyki poligonalnej
|
||||
self.Xr1f = self.Z_line_X * (1 + self.tanfi2 / self.tanfi1) * self.przekladnia
|
||||
self.Xr1Wf = self.Z_line_X * self.z1_reach * (1 + self.tanfi2 / self.tanfi1) * self.przekladnia
|
||||
|
||||
def init_state(self):
|
||||
"""Inicjalizacja stanów wewnętrznych"""
|
||||
# Timery dla każdej fazy i strefy (36 kanałów: 6 stref x 6 pętli)
|
||||
self.timers = {i: 0 for i in range(36)}
|
||||
|
||||
# Flagi trip dla różnych typów zwarć
|
||||
# Indeksy: 0-2=L1-E,L2-E,L3-E; 3-5=L1L2,L2L3,L3L1
|
||||
self.tripped = {
|
||||
'L1': False, 'L2': False, 'L3': False,
|
||||
'L1L2': False, 'L2L3': False, 'L3L1': False
|
||||
}
|
||||
|
||||
# Stany pobudzenia stref
|
||||
self.P = {i: False for i in range(36)}
|
||||
|
||||
# Stany prądowe
|
||||
self.Igr_state = [False, False, False]
|
||||
|
||||
# Liczniki dla funkcji sprawdz_P
|
||||
self.licznik = {i: 0 for i in range(36)}
|
||||
|
||||
# Blokady
|
||||
self.blokada_od_spz = 0
|
||||
self.blok_do_szyn = 0
|
||||
self.blok_do_linii = 0
|
||||
|
||||
# Stan nieustalony
|
||||
self.l_nieustalony = 5
|
||||
|
||||
# Kierunek
|
||||
self.Kp = False
|
||||
self.Km = False
|
||||
|
||||
def init_relay(self):
|
||||
"""Inicjalizacja zabezpieczenia"""
|
||||
print("Zabezpieczenie ZDistA zainicjalizowane")
|
||||
self.init_state()
|
||||
|
||||
def reset(self):
|
||||
"""Reset stanów"""
|
||||
self.init_state()
|
||||
|
||||
def _calculate_impedance(self, u_re, u_im, i_re, i_im):
|
||||
"""Oblicza impedancję Z = U/I"""
|
||||
i_mag_sq = i_re**2 + i_im**2
|
||||
if i_mag_sq < 1e-9:
|
||||
return 0.0, 0.0, 0.0
|
||||
|
||||
# Z = U / I = U * conj(I) / |I|^2
|
||||
z_re = (u_re * i_re + u_im * i_im) / i_mag_sq
|
||||
z_x = (u_im * i_re - u_re * i_im) / i_mag_sq
|
||||
z_mag = z_re**2 + z_x**2
|
||||
|
||||
return z_re, z_x, z_mag
|
||||
|
||||
def _calculate_loop_impedance(self, u_re, u_im, i_re, i_im, i0_re=0, i0_im=0, use_compensation=False):
|
||||
"""Oblicza impedancję pętli z opcjonalną kompensacją"""
|
||||
if use_compensation:
|
||||
# Kompensacja prądem ziemnozwarciowym
|
||||
# I = I_faza - K * I0
|
||||
i_comp_re = i_re - (self.ReK1 * i0_re - self.ImK1 * i0_im)
|
||||
i_comp_im = i_im - (self.ImK1 * i0_re + self.ReK1 * i0_im)
|
||||
else:
|
||||
i_comp_re = i_re
|
||||
i_comp_im = i_im
|
||||
|
||||
return self._calculate_impedance(u_re, u_im, i_comp_re, i_comp_im)
|
||||
|
||||
def in_polygon(self, R, X, reach_R, reach_X, angle_deg):
|
||||
"""
|
||||
Sprawdza czy punkt (R, X) jest wewnątrz charakterystyki wielokątnej
|
||||
"""
|
||||
# Obrót punktu
|
||||
angle_rad = math.radians(angle_deg)
|
||||
cos_a = math.cos(-angle_rad)
|
||||
sin_a = math.sin(-angle_rad)
|
||||
|
||||
R_rot = R * cos_a - X * sin_a
|
||||
X_rot = R * sin_a + X * cos_a
|
||||
|
||||
# Granice
|
||||
R_max = reach_R
|
||||
X_max = reach_X
|
||||
R_min = 0.1
|
||||
|
||||
# Sprawdzenie podstawowych granic
|
||||
if R_rot < R_min or R_rot > R_max:
|
||||
return False
|
||||
if abs(X_rot) > X_max:
|
||||
return False
|
||||
|
||||
# Sprawdzenie linii nachylonych (kąt fi1)
|
||||
X_upper = X_max * (1 - (R_rot / R_max) * math.tan(math.radians(90 - angle_deg + 10)))
|
||||
X_lower = -X_max * (1 - (R_rot / R_max) * math.tan(math.radians(90 - angle_deg + 10)))
|
||||
|
||||
return True
|
||||
|
||||
def _check_zone(self, z_re, z_x, z_mag, zone_R, zone_X, i_gr):
|
||||
"""Sprawdza czy impedancja jest w strefie"""
|
||||
if not i_gr:
|
||||
return False, False
|
||||
|
||||
# Sprawdzenie czy jest w wielokącie
|
||||
in_zone = self.in_polygon(z_re, z_x, zone_R, zone_X, self.line_angle)
|
||||
|
||||
# Dodatkowe kryterium: moduł
|
||||
zone_mag = zone_R**2 + zone_X**2
|
||||
|
||||
return in_zone and (z_mag < zone_mag), in_zone
|
||||
|
||||
def _sprawdz_P(self, idx, warunek_pobudzenia, warunek_powrotu):
|
||||
"""
|
||||
Funkcja sprawdz_P - symulacja detektora pobudzenia
|
||||
warunek_pobudzenia: True = zwiększ licznik
|
||||
warunek_powrotu: True = resetuj licznik
|
||||
"""
|
||||
prog_pobudzenia = 5 # Liczba cykli do pobudzenia
|
||||
prog_powrotu = 3
|
||||
|
||||
if warunek_pobudzenia:
|
||||
if self.licznik[idx] < prog_pobudzenia:
|
||||
self.licznik[idx] += 1
|
||||
if self.licznik[idx] >= prog_pobudzenia:
|
||||
self.P[idx] = True
|
||||
return True
|
||||
elif warunek_powrotu:
|
||||
if self.licznik[idx] > 0:
|
||||
self.licznik[idx] -= 1
|
||||
if self.licznik[idx] <= prog_powrotu:
|
||||
self.P[idx] = False
|
||||
|
||||
return self.P[idx]
|
||||
|
||||
def _check_direction(self, U_zg_re, U_zg_im, I_zg_re, I_zg_im, U_pr_re, U_pr_im, I_pr_re, I_pr_im):
|
||||
"""
|
||||
Sprawdza kierunek na podstawie mocy
|
||||
Używa składowej zgodnej lub przeciwnej
|
||||
"""
|
||||
# Kryterium mocy: P = Re(U * conj(I))
|
||||
power = U_zg_re * I_zg_re + U_zg_im * I_zg_im
|
||||
|
||||
if power > 0:
|
||||
self.Kp = True
|
||||
self.Km = False
|
||||
return 1 # Forward
|
||||
else:
|
||||
self.Kp = False
|
||||
self.Km = True
|
||||
return -1 # Backward
|
||||
|
||||
def step_relay(self, phase, u_re, u_im, i_re, i_im,
|
||||
u0_re, u0_im, i0_re, i0_im,
|
||||
u_zg_re, u_zg_im, i_zg_re, i_zg_im):
|
||||
"""
|
||||
Krok algorytmu dla jednej fazy
|
||||
phase: 'L1', 'L2' lub 'L3'
|
||||
Zwraca: 0 = brak trip, 1 = trip
|
||||
"""
|
||||
# Mapowanie fazy na indeks
|
||||
phase_map = {'L1': 0, 'L2': 1, 'L3': 2}
|
||||
phase_idx = phase_map.get(phase, 0)
|
||||
|
||||
# Sprawdź minimalne progi
|
||||
i_mag = math.sqrt(i_re**2 + i_im**2)
|
||||
u_mag = math.sqrt(u_re**2 + u_im**2)
|
||||
|
||||
if i_mag < self.I_min or u_mag < self.U_min:
|
||||
return 0
|
||||
|
||||
# Jeśli już wyłączone, nie sprawdzaj dalej
|
||||
if self.tripped[phase]:
|
||||
return 1
|
||||
|
||||
# Oblicz impedancję pętli faza-ziemia z kompensacją
|
||||
z_re, z_x, z_mag = self._calculate_loop_impedance(
|
||||
u_re, u_im, i_re, i_im, i0_re, i0_im, use_compensation=True
|
||||
)
|
||||
|
||||
# Sprawdź kierunek
|
||||
direction = self._check_direction(
|
||||
u_zg_re, u_zg_im, i_zg_re, i_zg_im,
|
||||
0, 0, 0, 0
|
||||
)
|
||||
|
||||
# Filtr kierunku
|
||||
if self.kierunek == 1 and direction < 0: # Do linii, ale jest wstecz
|
||||
return 0
|
||||
if self.kierunek == 2 and direction > 0: # Do szyn, ale jest w przód
|
||||
return 0
|
||||
|
||||
# Sprawdzenie kryterium prądowego
|
||||
i_sq = i_re**2 + i_im**2
|
||||
self.Igr_state[phase_idx] = i_sq > self.Igr
|
||||
|
||||
# Obsługa stanu nieustalonego
|
||||
if i_sq < 0.0001: # Bardzo mały prąd
|
||||
self.l_nieustalony = 5
|
||||
elif self.l_nieustalony > 0:
|
||||
self.l_nieustalony -= 1
|
||||
|
||||
trip = 0
|
||||
|
||||
# Sprawdzenie stref
|
||||
# Strefa 1 - natychmiastowa
|
||||
in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z1_R, self.Z1_X, self.Igr_state[phase_idx])
|
||||
if in_zone and self.l_nieustalony == 0:
|
||||
if self.t_z1 == 0:
|
||||
self.tripped[phase] = True
|
||||
return 1
|
||||
else:
|
||||
idx = 0 * 3 + phase_idx # strefa 0-5, faza 0-2
|
||||
if self._sprawdz_P(idx, True, False):
|
||||
self.tripped[phase] = True
|
||||
return 1
|
||||
|
||||
# Strefa 2 - opóźniona
|
||||
in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z2_R, self.Z2_X, self.Igr_state[phase_idx])
|
||||
if in_zone and self.l_nieustalony == 0:
|
||||
idx = 1 * 3 + phase_idx
|
||||
if self._sprawdz_P(idx, True, False):
|
||||
if self.timers[idx] < self.t_z2:
|
||||
self.timers[idx] += 1
|
||||
elif not self.tripped[phase]:
|
||||
self.tripped[phase] = True
|
||||
return 1
|
||||
else:
|
||||
self.timers[idx] = 0
|
||||
|
||||
# Strefa 3 - rezerwowa
|
||||
in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z3_R, self.Z3_X, self.Igr_state[phase_idx])
|
||||
if in_zone and self.l_nieustalony == 0:
|
||||
idx = 2 * 3 + phase_idx
|
||||
if self._sprawdz_P(idx, True, False):
|
||||
if self.timers[idx] < self.t_z3:
|
||||
self.timers[idx] += 1
|
||||
elif not self.tripped[phase]:
|
||||
self.tripped[phase] = True
|
||||
return 1
|
||||
else:
|
||||
self.timers[idx] = 0
|
||||
|
||||
# Strefa 4
|
||||
in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z4_R, self.Z4_X, self.Igr_state[phase_idx])
|
||||
if in_zone and self.l_nieustalony == 0:
|
||||
idx = 3 * 3 + phase_idx
|
||||
if self._sprawdz_P(idx, True, False):
|
||||
if self.timers[idx] < self.t_z4:
|
||||
self.timers[idx] += 1
|
||||
elif not self.tripped[phase]:
|
||||
self.tripped[phase] = True
|
||||
return 1
|
||||
else:
|
||||
self.timers[idx] = 0
|
||||
|
||||
# Strefa 5
|
||||
in_zone, _ = self._check_zone(z_re, z_x, z_mag, self.Z5_R, self.Z5_X, self.Igr_state[phase_idx])
|
||||
if in_zone and self.l_nieustalony == 0:
|
||||
idx = 4 * 3 + phase_idx
|
||||
if self._sprawdz_P(idx, True, False):
|
||||
if self.timers[idx] < self.t_z5:
|
||||
self.timers[idx] += 1
|
||||
elif not self.tripped[phase]:
|
||||
self.tripped[phase] = True
|
||||
return 1
|
||||
else:
|
||||
self.timers[idx] = 0
|
||||
|
||||
return 0
|
||||
|
||||
def step_relay_phase(self, phase, u_re, u_im, i_re, i_im):
|
||||
"""
|
||||
Uproszczony krok algorytmu dla jednej fazy (bez składowych symetrycznych)
|
||||
"""
|
||||
return self.step_relay(phase, u_re, u_im, i_re, i_im,
|
||||
0, 0, 0, 0, # u0, i0
|
||||
u_re, u_im, i_re, i_im) # U_zg, I_zg (używamy fazowych)
|
||||
37
tester.py
37
tester.py
@@ -4,9 +4,32 @@ matplotlib.use('Agg') # Non-interactive backend for saving plots
|
||||
import matplotlib.pyplot as plt
|
||||
from comtrade import Comtrade
|
||||
import math
|
||||
from distance_algorithm import DistanceRelay
|
||||
import sys
|
||||
|
||||
# ============================================================
|
||||
# KONFIGURACJA - WYBIERZ ALGORYTM
|
||||
# ============================================================
|
||||
# Dostępne algorytmy:
|
||||
# 1 - distance_algorithm (DistanceRelay) - podstawowy
|
||||
# 2 - distance_algorithm_zimba (DistanceRelayZDistA) - bazowany na ZDistA_komp.c
|
||||
|
||||
ALGORITHM = 2 # <-- zmień tę wartość aby przełączyć algorytm
|
||||
|
||||
if ALGORITHM == 1:
|
||||
from distance_algorithm import DistanceRelay
|
||||
ALGORITHM_NAME = "distance_relay"
|
||||
ALGORITHM_DESC = "DistanceRelay (podstawowy)"
|
||||
elif ALGORITHM == 2:
|
||||
from distance_algorithm_zimba import DistanceRelayZDistA
|
||||
DistanceRelay = DistanceRelayZDistA
|
||||
ALGORITHM_NAME = "zdistA"
|
||||
ALGORITHM_DESC = "DistanceRelayZDistA (bazowany na ZDistA_komp.c)"
|
||||
else:
|
||||
raise ValueError(f"Nieznany algorytm: {ALGORITHM}")
|
||||
|
||||
print(f"=== Używany algorytm: {ALGORITHM_DESC} ===")
|
||||
# ============================================================
|
||||
|
||||
# Obsluga argumentow wiersza polecen
|
||||
if len(sys.argv) > 1:
|
||||
base_name = sys.argv[1] # Nazwa pliku bez rozszerzenia
|
||||
@@ -362,8 +385,8 @@ plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
plt.tight_layout()
|
||||
plt.savefig('test_result.png')
|
||||
print("Wynik zapisany do test_result.png")
|
||||
plt.savefig(f'test_result_{ALGORITHM_NAME}.png')
|
||||
print(f"Wynik zapisany do test_result_{ALGORITHM_NAME}.png")
|
||||
|
||||
# Generowanie pliku rezultat.md
|
||||
def detect_fault_and_generate_report(t, trip_history, z_r_history, z_x_history,
|
||||
@@ -434,10 +457,10 @@ def detect_fault_and_generate_report(t, trip_history, z_r_history, z_x_history,
|
||||
|
||||
# Zapisz raport
|
||||
report_content = "\n".join(report)
|
||||
with open('rezultat.md', 'w', encoding='utf-8') as f:
|
||||
with open(f'rezultat_{ALGORITHM_NAME}.md', 'w', encoding='utf-8') as f:
|
||||
f.write(report_content)
|
||||
|
||||
print("Raport zapisany do rezultat.md")
|
||||
print(f"Raport zapisany do rezultat_{ALGORITHM_NAME}.md")
|
||||
return fault_idx is not None
|
||||
|
||||
# Generuj raport
|
||||
@@ -535,7 +558,7 @@ result_md = generate_result_md(
|
||||
Z_line_R, Z_line_X, relay
|
||||
)
|
||||
|
||||
with open('rezultat.md', 'w', encoding='utf-8') as f:
|
||||
with open(f'rezultat_{ALGORITHM_NAME}.md', 'w', encoding='utf-8') as f:
|
||||
f.write(result_md)
|
||||
|
||||
print("Zapisano rezultat.md")
|
||||
print(f"Zapisano rezultat_{ALGORITHM_NAME}.md")
|
||||
|
||||
Reference in New Issue
Block a user