Source code for bts

"""Core backend logic for lightning data processing in the HLMA application.

This module provides functions for reading, processing, and analyzing
lightning datasets from LYLOUT and ENTLN sources. It includes algorithms
for detecting and grouping lightning flashes, as well as utilities for
preprocessing and structuring the data for visualization and analysis
in the HLMA GUI.

Functions
---------
zipped_lylout_reader(file, skiprows=55)
    Reads a compressed LYLOUT `.dat.gz` file. Used by `open_lylout`.

lylout_reader(file, skiprows=55)
    Reads a plain-text LYLOUT `.dat` file. Used by `open_lylout`.

open_lylout(files)
    Reads multiple LYLOUT files and combines them into a single DataFrame.

entln_reader(file, min_date)
    Reads a single ENTLN CSV file. Used by `open_entln`.

open_entln(files, min_date)
    Reads multiple ENTLN CSV files and combines them into a single DataFrame.

dot_to_dot(env)
    Applies the dot-to-dot flash detection algorithm on lightning data.

mc_caul(env)
    Applies the McCaul flash detection algorithm on lightning data.

Notes
-----
- Reader functions are used by `open_lylout` and `open_entln` to process multiple files.
- Other functions are mostly internal and intended for use by the HLMA application.
- The flash detection algorithms are experimental and should be interpreted with caution.

"""


import gzip
import logging
import re
import warnings
from pathlib import Path

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from pyproj import Transformer

from setup import State

warnings.filterwarnings("ignore")

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger("bts.py")
logger.setLevel(logging.DEBUG)

[docs] def zipped_lylout_reader(file: str, skiprows: int = 55) -> pd.DataFrame | None: """Read a compressed LYLOUT .dat.gz file into a DataFrame. Used by `open_lylout`. Parses the file, calculates the number of stations contributing from the mask, extracts the date from the filename, computes absolute datetimes, and initializes a `flash_id` column. Parameters ---------- file : str Path to the `.dat.gz` LYLOUT file. skiprows : int, optional Number of initial rows to skip (default is 55). Returns ------- pd.DataFrame or None DataFrame with columns: ["datetime", "lat", "lon", "alt", "chi", "pdb", "number_stations", "utc_sec", "mask", "flash_id"]. Returns `None` if the file could not be read. """ try: with gzip.open(file, "rt") as f: tmp = pd.read_csv(f, skiprows = skiprows, header=None, names=["utc_sec", "lat", "lon", "alt", "chi", "pdb", "mask"], sep=r"\s+") tmp["number_stations"] = tmp["mask"].apply(lambda x: int(x, 16).bit_count()) tmp_date = re.match(r".*\w+_(\d+)_\d+_\d+\.dat\.gz", file).group(1) tmp["datetime"] = pd.to_datetime(tmp_date, format="%y%m%d") + pd.to_timedelta(tmp.utc_sec, unit="s") tmp["flash_id"] = -1 return tmp[["datetime", "lat", "lon", "alt", "chi", "pdb", "number_stations", "utc_sec", "mask", "flash_id"]].reset_index(drop=True) except Exception as e: logger.warning("Could not open %s due to %s", file, e) return None
[docs] def lylout_reader(file: str, skiprows: int = 55) -> pd.DataFrame | None: """Read a plain-text LYLOUT .dat file into a DataFrame. Used by `open_lylout`. Parses the file, calculates the number of stations contributing from the mask, extracts the date from the filename, computes absolute datetimes, and initializes a `flash_id` column. Parameters ---------- file : str Path to the LYLOUT `.dat` file. skiprows : int, optional Number of initial rows to skip (default is 55). Returns ------- pd.DataFrame or None DataFrame with columns: ["datetime", "lat", "lon", "alt", "chi", "pdb", "number_stations", "utc_sec", "mask", "flash_id"]. Returns `None` if the file could not be read. """ try: tmp = pd.read_csv(file, skiprows = skiprows, header=None, names=["utc_sec", "lat", "lon", "alt", "chi", "pdb", "mask"], sep=r"\s+") tmp["number_stations"] = tmp["mask"].apply(lambda x: int(x, 16).bit_count()) tmp_date = re.match(r".*\w+_(\d+)_\d+_\d+\.dat", file).group(1) tmp["datetime"] = pd.to_datetime(tmp_date, format="%y%m%d") + pd.to_timedelta(tmp.utc_sec, unit="s") tmp["flash_id"] = -1 tmp = tmp[["datetime", "lat", "lon", "alt", "chi", "pdb", "number_stations", "utc_sec", "mask", "flash_id"]] tmp = tmp.reset_index(drop=True) except Exception as e: logger.warning("Could not open %s due to %s", file, e) return None else: return tmp
[docs] def open_lylout(files: list[str]) -> tuple[pd.DataFrame, np.ndarray]: """Read multiple LYLOUT files (compressed or plain) and combine them into a single DataFrame. Determines the number of header rows by inspecting the first file, extracts LMA station coordinates, reads all files in parallel, concatenates the results, and computes a 'seconds' column relative to the first midnight. Parameters ---------- files : list of str List of paths to LYLOUT files (.dat or .dat.gz). Returns ------- tuple Tuple containing: - `pd.DataFrame`: All LYLOUT data concatenated. - `np.ndarray`: LMA station coordinates as float32, shape (n_stations, 2). """ # manually read first file to eshtablish skiprows and lma info lma_stations = [] skiprows = None logger.info("Starting to read LYLOUT files.") if files[0].endswith(".dat.gz"): with gzip.open(files[0], "rt") as f: for i, line in enumerate(f): if line.strip().startswith("Sta_info:"): parts = line.strip().split() lon = float(parts[-5]) lat = float(parts[-6]) lma_stations.append((lon, lat)) if line.startswith("*** data ***"): skiprows = i + 1 break lylout_read = Parallel(n_jobs=-5)(delayed(zipped_lylout_reader)(f, skiprows=skiprows) for f in files) else: with Path.open(files[0]) as f: for i, line in enumerate(f): if line.strip().startswith("Sta_info:"): parts = line.strip().split() lon = float(parts[-5]) lat = float(parts[-6]) lma_stations.append((lon, lat)) if line.startswith("*** data ***"): skiprows = i + 1 break lylout_read = Parallel(n_jobs=-5)(delayed(lylout_reader)(f, skiprows=skiprows) for f in files) all_data = pd.concat(lylout_read, ignore_index=True) all_data["seconds"] = (all_data["datetime"] - all_data["datetime"].min().normalize()).dt.total_seconds() return all_data, np.array(lma_stations, dtype=np.float32)
[docs] def open_entln(files: list[str], min_date: pd.Timestamp) -> pd.DataFrame: """Read multiple ENTLN CSV files and combine them into a single DataFrame. Uses `entln_reader` to process each file, then concatenates the results and computes a 'seconds' column relative to the first midnight of the dataset. Parameters ---------- files : list of str List of paths to ENTLN CSV files. min_date : pd.Timestamp Minimum datetime reference to filter the data. Returns ------- pd.DataFrame Combined DataFrame containing all ENTLN data with a computed 'seconds' column. """ logger.info("Received min time of %s", min_date) entln_read = Parallel(n_jobs=-5)(delayed(entln_reader)(f, min_date) for f in files) all_data = pd.concat(entln_read, ignore_index=True) all_data["seconds"] = (all_data["datetime"] - all_data["datetime"].min().normalize()).dt.total_seconds() return all_data
[docs] def entln_reader(file: str, min_date: pd.Timestamp) -> pd.DataFrame | None: """Read a single ENTLN CSV file and format it to match LYLOUT data. Used by `open_entln`. Converts timestamps, renames columns to standard names, computes UTC seconds, and returns a DataFrame with essential columns. Parameters ---------- file : str Path to the ENTLN CSV file. min_date : pd.Timestamp Reference datetime for computing UTC seconds. Returns ------- pd.DataFrame or None DataFrame with columns: ["datetime", "lat", "lon", "alt", "peakcurrent", "numbersensors", "utc_sec", "type"]. Returns `None` if the file cannot be read or processed. """ try: tmp = pd.read_csv(file) tmp["timestamp"] = pd.to_datetime(tmp["timestamp"]) tmp["type"] = pd.to_numeric(tmp["type"]) # re-name to match LYLOUT file tmp = tmp.rename(columns={ "timestamp": "datetime", "latitude": "lat", "longitude": "lon", "icheight": "alt", }) tmp["utc_sec"] = (tmp["datetime"] - min_date).dt.total_seconds() except Exception as e: logger.warning("Failed on %s due to %s", file, e) return None else: return tmp[["datetime", "lat", "lon", "alt", "peakcurrent", "numbersensors", "utc_sec", "type"]]
[docs] def dot_to_dot(env: State) -> None: """Apply the dot-to-dot flash detection algorithm on lightning data. Groups lightning events in space and time to identify flashes, updates the `flash_id` column in the dataset, and projects data to ECEF coordinates. Computation is parallelized for speed. Parameters ---------- env : State State object containing lightning data (`env.all`) and station coordinates (`env.stations`). Returns ------- None """ logger.info("Starting dot to dot flashing.") # unpacking lyl = env.all[env.plot] env.all["flash_id"] = -1 # resetting global flash data to avoid incosistencies lma_stations = env.stations lon_0, lat_0 = tuple(sum(coords) / len(lma_stations) for coords in zip(*lma_stations, strict=True)) distance_threshold = 3000 # in meters time_threshold = 0.15 # projecting to_ecef = Transformer.from_crs("EPSG:4326", "EPSG:4978", always_xy=True) x_0, y_0, z_0 = to_ecef.transform(lon_0, lat_0, 0) xs, ys, zs = to_ecef.transform(lyl.lon, lyl.lat, lyl.alt) lyl["x"], lyl["y"], lyl["z"] = xs - x_0, ys - y_0, zs - z_0 timethreshold_ns = int(pd.Timedelta(seconds=time_threshold).value) distancethreshold_2 = distance_threshold**2 def dtd_flasher(df: pd.DataFrame) -> np.ndarray: fid = 0 remaining = np.ones(len(df), dtype=bool) datetimes = df["datetime"].to_numpy(dtype=np.int64) xys = df[["x", "y"]].to_numpy() indices = df.index.to_numpy() flash_id = np.full(len(df), -1) while remaining.any(): candidates = np.flatnonzero(remaining) candidates_dts = datetimes[candidates] candidates_xys = xys[candidates] candidates_ids = indices[candidates] flash_mask = np.zeros(len(candidates), dtype=bool) flash_mask[0] = True consideration = (candidates_dts - candidates_dts[0]) <= timethreshold_ns consideration[0] = False concan = np.flatnonzero(consideration) lyst = list(concan) syt = set(concan) for i in lyst: if not flash_mask[i]: flash_indices = np.where(flash_mask)[0] if np.any((np.sum((candidates_xys[flash_indices] - candidates_xys[i])**2, axis=1) <= distancethreshold_2) & ((candidates_dts[i] - candidates_dts[flash_indices]) > 0) & ((candidates_dts[i] - candidates_dts[flash_indices]) <= timethreshold_ns)): flash_mask[i] = True consideration = ((candidates_dts - candidates_dts[flash_mask].max()) > 0) & ((candidates_dts - candidates_dts[flash_mask].max()) <= timethreshold_ns) & (~flash_mask) newconcan = set(np.flatnonzero(consideration)) - syt syt.update(newconcan) lyst.extend(newconcan) update = candidates_ids[flash_mask] remaining[update] = False flash_id[update] = fid fid += 1 return flash_id gap = lyl["datetime"].astype("int64").diff() > timethreshold_ns group_ids = gap.cumsum() dfs = [group.reset_index(drop=True).copy() for _, group in lyl.groupby(group_ids) if len(group) > 0] results = Parallel(n_jobs=-10, backend="loky")(delayed(dtd_flasher)(df) for df in dfs) offset = 0 for i, res in enumerate(results): results[i] = res + offset offset = results[i].max() + 1 env.all.loc[lyl.index, "flash_id"] = np.concatenate(results) logger.info("Finished dot to dot flashing.")
[docs] def mc_caul(env: State) -> None: """Apply the McCaul flash detection algorithm on lightning data. Groups lightning events using distance, time, and azimuth thresholds, updates the `flash_id` column in the dataset, and projects data to ECEF coordinates. Computation is parallelized for speed. Parameters ---------- env : State State object containing lightning data (`env.all`) and station coordinates (`env.stations`). Returns ------- None """ logger.info("Starting McCaul flashing.") logger.info("Starting McCaul flashing.") # unpacking lyl = env.all[env.plot] env.all["flash_id"] = -1 # resetting global flash data to avoid incosistencies lma_stations = env.stations lon_0, lat_0 = tuple(sum(coords) / len(lma_stations) for coords in zip(*lma_stations, strict=True)) time_threshold = 0.15 # in seconds azimuth_threshold = 0.05 # in radians # projecting to_ecef = Transformer.from_crs("EPSG:4326", "EPSG:4978", always_xy=True) x_0, y_0, z_0 = to_ecef.transform(lon_0, lat_0, 0) xs, ys, zs = to_ecef.transform(lyl.lon, lyl.lat, lyl.alt) lyl["x"], lyl["y"], lyl["z"] = xs - x_0, ys - y_0, zs - z_0 timethreshold_ns = int(pd.Timedelta(seconds=time_threshold).value) def mcc_flasher(df: pd.DataFrame) -> np.ndarray: fid = 0 remaining = np.ones(len(df), dtype=bool) datetimes = df["datetime"].to_numpy(dtype=np.int64) xys = df[["x", "y"]].to_numpy() azimuths = np.arctan2(df["y"].to_numpy(), df["x"].to_numpy()) indices = df.index.to_numpy() flash_id = np.full(len(df), -1) while remaining.any(): candidates = np.flatnonzero(remaining) candidates_dts = datetimes[candidates] candidates_xys = xys[candidates] candidates_azs = azimuths[candidates] candidates_ids = indices[candidates] flash_mask = np.zeros(len(candidates), dtype=bool) flash_mask[0] = True consideration = (candidates_dts - candidates_dts[0]) <= timethreshold_ns consideration[0] = False concan = np.flatnonzero(consideration) lyst = list(concan) syt = set(concan) for i in lyst: if not flash_mask[i]: distancethreshold_2 = 9000000 * ((candidates_xys[i][0]**2 + candidates_xys[i][1]**2) / 100000**2)**2 # FIXME: worses r2 from center of lma flash_indices = np.where(flash_mask)[0] if np.any((np.sum((candidates_xys[flash_indices] - candidates_xys[i])**2, axis=1) <= distancethreshold_2) & ((candidates_dts[i] - candidates_dts[flash_indices]) > 0) & ((candidates_dts[i] - candidates_dts[flash_indices]) <= timethreshold_ns) & (np.minimum(np.abs(candidates_azs[flash_indices] - candidates_azs[i]), 2*np.pi - np.abs(candidates_azs[flash_indices] - candidates_azs[i])) <= 0.05)): flash_mask[i] = True consideration = ((candidates_dts - candidates_dts[flash_mask].max()) > 0) & ((candidates_dts - candidates_dts[flash_mask].max()) <= timethreshold_ns) & (~flash_mask) newconcan = set(np.flatnonzero(consideration)) - syt syt.update(newconcan) lyst.extend(newconcan) update = candidates_ids[flash_mask] remaining[update] = False flash_id[update] = fid fid += 1 return flash_id gap = lyl["datetime"].astype("int64").diff() > timethreshold_ns group_ids = gap.cumsum() dfs = [group.reset_index(drop=True).copy() for _, group in lyl.groupby(group_ids) if len(group) > 0] results = Parallel(n_jobs=-10, backend="loky")(delayed(mcc_flasher)(df) for df in dfs) offset = 0 for i, res in enumerate(results): results[i] = res + offset offset = results[i].max() + 1 env.all.loc[lyl.index, "flash_id"] = np.concatenate(results) logger.info("Finished McCaul flashing.")