Source code for snpio.io.genepop_reader

from pathlib import Path
from typing import List, Literal, Optional, cast

import numpy as np

from snpio.read_input.genotype_data import GenotypeData
from snpio.utils.custom_exceptions import (
    AlignmentFileNotFoundError,
    AlignmentFormatError,
    SequenceLengthError,
)
from snpio.utils.logging import LoggerManager
from snpio.utils.misc import IUPAC


[docs] class GenePopReader(GenotypeData): """Reads GenePop-formatted files into a GenotypeData object. Supports: - 2-digit and 3-digit allele codings - Mixed ploidy: haploid and diploid entries - Missing data encoded as 0000, 000000, or partial (e.g., 1000, 0010) - Flexible locus headers (comma-separated or newline-separated) Example: >>> gp = GenePopReader("example.gen", popmapfile="pops.txt") >>> gp.snp_data array([...]) """
[docs] def __init__( self, filename: str, popmapfile: Optional[str] = None, allele_encoding: dict[str, str] | None = None, force_popmap: bool = False, exclude_pops: Optional[List[str]] = None, include_pops: Optional[List[str]] = None, plot_format: str = "png", prefix: str = "snpio", verbose: bool = False, debug: bool = False, ) -> None: """Initialize the GenePopReader. This class reads a GenePop file and extracts genotype data, populations, and sample information. Args: filename (str): Path to the GenePop file. popmapfile (Optional[str]): Path to the population map file. allele_encoding (Optional[dict[str, str]]): Mapping of allele codes to IUPAC symbols. force_popmap (bool): Whether to enforce population mapping. exclude_pops (Optional[List[str]]): List of populations to exclude. include_pops (Optional[List[str]]): List of populations to include. plot_format (str): Format for output plots (default: "png"). prefix (str): Prefix for output files (default: "snpio"). verbose (bool): Whether to enable verbose logging (default: False). debug (bool): Whether to enable debug mode (default: False). """ kwargs = {"prefix": prefix, "verbose": verbose, "debug": debug} logman = LoggerManager(name=__name__, **kwargs) self.logger = logman.get_logger() self.iupac = IUPAC(logger=self.logger) self.verbose = verbose self.debug = debug self.resource_data = {} pfmt = "png" if plot_format is None else plot_format pfmt = cast(Literal["png", "jpg", "pdf", "jpeg"], pfmt) self.allele_encoding = allele_encoding or { "01": "A", "02": "C", "03": "G", "04": "T", "001": "A", "002": "C", "003": "G", "004": "T", } super().__init__( filename=filename, filetype="genepop", popmapfile=popmapfile, force_popmap=force_popmap, exclude_pops=exclude_pops, include_pops=include_pops, plot_format=pfmt, prefix=prefix, verbose=verbose, logger=self.logger, debug=debug, )
def load_aln(self) -> None: """Load the GenePop file and parse genotype data.""" if self.filename is None: msg = "No GenePop filename provided." self.logger.error(msg) raise AlignmentFileNotFoundError(msg) path = Path(self.filename) if not path.exists(): msg = f"GenePop file {self.filename} not found." self.logger.error(msg) raise AlignmentFileNotFoundError(msg) self.logger.info(f"Reading GenePop file {self.filename}...") with open(path, "r") as f: lines = [line.strip() for line in f if line.strip()] if not lines: msg = f"GenePop file {self.filename} is empty or malformed." self.logger.error(msg) raise AlignmentFormatError(msg) # Skip title line lines = lines[1:] # Extract marker names self.marker_names = [] while lines and not lines[0].upper().startswith("POP"): line = lines.pop(0) self.marker_names.extend( [m.strip() for m in line.replace(",", " ").split() if m.strip()] ) # Parse genotypes samples, populations, genotypes = [], [], [] current_pop = None pop_count = 0 for line in lines: if line.upper() == "POP": pop_count += 1 current_pop = f"Pop{pop_count}" continue try: sid, gts = line.split(",", 1) sid = sid.strip() allele_strs = gts.strip().split() if not allele_strs: msg = f"Sample {sid} has no genotype data." self.logger.error(msg) raise AlignmentFormatError(msg) samples.append(sid) populations.append(current_pop) gt_calls = [] for code in allele_strs: code = code.strip() if not code or not code.isdigit(): msg = f"Invalid genotype format: '{code}'" self.logger.error(msg) raise AlignmentFormatError(msg) # Allow for haploid 3-digit codes (length 3) if len(code) not in {2, 3, 4, 6}: msg = f"Unexpected allele length for sample {sid}: {code}" self.logger.error(msg) raise AlignmentFormatError(msg) # Determine ploidy and convert to IUPAC if len(code) == 2 or len(code) == 3: a1, a2 = code, code elif len(code) == 4: # Diploid, 2-digit codes a1, a2 = code[:2], code[2:] elif len(code) == 6: # Diploid, 3-digit codes a1, a2 = code[:3], code[3:] else: msg = f"Invalid allele code length for sample {sid}: '{code}'" self.logger.error(msg) raise AlignmentFormatError(msg) gt_calls.append(self._convert_to_iupac(a1, a2)) genotypes.append(gt_calls) except ValueError: msg = f"Malformed line in GenePop file: '{line}'" self.logger.error(msg) raise AlignmentFormatError(msg) self.samples = samples self.populations = populations self.snp_data = np.array(genotypes, dtype="<U1") if len(set(map(len, genotypes))) > 1: msg = "Inconsistent number of loci across samples." self.logger.error(msg) raise SequenceLengthError(msg) self._ref, self._alt = self.get_ref_alt_alleles(self.snp_data) self.logger.info( f"Loaded {len(samples)} samples across {len(set(populations))} populations." ) self.logger.info(f"Found {self.num_snps} SNPs.") def _convert_to_iupac(self, allele1: str, allele2: str) -> str: """Convert two alleles to IUPAC code.""" missing_vals = {"00", "000", "99", "999"} if allele1 in missing_vals or allele2 in missing_vals: return "N" try: return self.iupac.encode_genepop_pair( allele1, allele2, allele_map=self.allele_encoding ) except Exception as e: self.logger.warning( f"Failed to convert alleles {allele1}/{allele2} to IUPAC: {e}" ) return "N"