"""
This file contains the high level GPX object.
"""
from __future__ import annotations
import io
import warnings
from datetime import datetime, timedelta
from math import degrees
from pathlib import Path
from typing import IO, Any, Optional
from zipfile import ZipFile
from zoneinfo import ZoneInfo
import narwhals as nw
import pandas as pd
import polars as pl
from narwhals.typing import IntoFrameT
from timezonefinder import TimezoneFinder
from .complex_types import (
Extensions,
Gpx,
Link,
Trk,
Trkseg,
Wpt,
)
from .constants.precisions import DEFAULT_PRECISION_DICT, DEFAULT_TIME_FORMAT
from .parsers.fit_parser import FitParser
from .parsers.gpx_parser import GPXParser
from .parsers.kml_parser import KMLParser
from .simple_types import Latitude, Longitude
from .utils import (
EARTH_RADIUS,
check_xml_extensions_schemas,
check_xml_schema,
haversine_distance,
instance_from_str,
is_dataframe,
ramer_douglas_peucker,
split_attributes,
)
from .writers.gpx_writer import GPXWriter
from .writers.kml_writer import KMLWriter
[docs]
class GPX:
"""
High level GPX object.
"""
TIME_VALUES = ["time", "speed", "pace", "ascent_speed"]
ELEVATION_VALUES = ["ele", "ascent_rate", "ascent_speed"]
def __init__(
self,
source: Optional[str | Path | IO[str] | IO[bytes] | bytes | IntoFrameT] = None,
xml_schema: bool = True,
xml_extensions_schemas: bool = False,
) -> None:
"""
Initialise high level GPX object.
Args:
source (str | Path | IO[str] | IO[bytes] | bytes | IntoFrameT, optional):
Path to a file or a file-like object to parse. Defaults to None.
xml_schemas (bool, optional): Toggle schema
verification during parsing. Defaults to True.
xml_extensions_schemas (bool, optional): Toggle extensions
schema verificaton durign parsing. Requires internet connection
connection and is not guaranted to work. Defaults to False.
Raises:
FileNotFoundError: _description_
ValueError: _description_
FileNotFoundError: _description_
"""
# GPX file description
self.source: str | Path | IO[str] | IO[bytes] | bytes | IntoFrameT = None
# GPX file content
self.gpx: Gpx = None
self.xmlns: dict = {}
self.xsi_schema_location: dict = {}
self._ele_data: bool = False
self._time_data: bool = False
self._time_zone: str = None
self._precisions: dict = DEFAULT_PRECISION_DICT
self._time_format: str = DEFAULT_TIME_FORMAT
self._extensions_fields: dict = {}
# Markers
self._dist_from_start = False
self._ascent_rate = False
self._speed: bool = False
self._pace: bool = False
self._ascent_speed: bool = False
# Empty source - Create empty GPX instance for advanced use
if source is None:
self._init_from_none()
# Valid file
elif isinstance(source, (str, Path)):
self.source = Path(source)
# GPX
if self.source.suffix.lower() == ".gpx":
self._init_from_gpx(xml_schema, xml_extensions_schemas)
# KML
elif self.source.suffix.lower() == ".kml":
self._init_from_kml()
# KMZ
elif self.source.suffix.lower() == ".kmz":
self._init_from_kmz(xml_schema, xml_extensions_schemas)
# FIT
elif self.source.suffix.lower() == ".fit":
self._init_from_fit()
# CSV
elif self.source.suffix.lower() == ".csv":
self._init_from_csv()
# Invalid file or file path
else:
raise ValueError(
f"Unable to parse this type of file: {source}"
"Consider renaming your file with the proper file extension."
)
# Bytes
elif isinstance(
source, (io.TextIOBase, io.BufferedIOBase, io.RawIOBase, bytes)
):
raise NotImplementedError(
"Initialising GPX from byte-like object is not implemented yet."
) # TODO treat each case independantly
# Dataframe
elif is_dataframe(source):
self._init_from_dataframe(source)
# Invalid
else:
raise TypeError(
f"Invalid source type: {type(source)}. "
"Expected str, Path, IO[str], IO[bytes], bytes, pd.DataFrame or pl.DataFrame."
)
def _init_from_none(self):
"""
Initialise empty GPX instance.
"""
warnings.warn(
"No file path provided, creating an empty GPX instance.", UserWarning
)
self.gpx = Gpx("1.1", "ezGPX")
def _init_from_gpx(
self, xml_schema: bool = True, xml_extensions_schemas: bool = False
):
"""
Initialise GPX instance from GPX file.
Args:
xml_schemas (bool, optional): Toggle schema
verification during parsing. Defaults to True.
xml_extensions_schemas (bool, optional): Toggle extensions
schema verificaton durign parsing. Requires internet connection
connection and is not guaranted to work. Defaults to False.
"""
d = GPXParser(self.source, xml_schema, xml_extensions_schemas).parse()
self.gpx = d["gpx"]
self.xmlns = d["xmlns"]
self.xsi_schema_location = d["xsi_schema_location"]
self._ele_data = d["ele_data"]
self._time_data = d["time_data"]
self._precisions = d["precisions"]
self._time_format = d["time_format"]
self._extensions_fields = d["extensions_fields"]
def _init_from_kml(
self, xml_schema: bool = True, xml_extensions_schemas: bool = False
):
"""
Initialise GPX instance from KML file.
Args:
xml_schemas (bool, optional): Toggle schema
verification during parsing. Defaults to True.
xml_extensions_schemas (bool, optional): Toggle extensions
schema verificaton durign parsing. Requires internet connection
connection and is not guaranted to work. Defaults to False.
"""
d = KMLParser(self.source, xml_schema, xml_extensions_schemas).parse()
self.gpx = d["gpx"]
self.xmlns = {}
self._ele_data = False
self._time_data = False
self._precisions = d["precisions"]
self._time_format = d["time_format"]
self._extensions_fields = None
def _init_from_kmz(
self, xml_schema: bool = True, xml_extensions_schemas: bool = False
):
"""
Initialise GPX instance from KMZ file.
Args:
xml_schemas (bool, optional): Toggle schema
verification during parsing. Defaults to True.
xml_extensions_schemas (bool, optional): Toggle extensions
schema verificaton durign parsing. Requires internet connection
connection and is not guaranted to work. Defaults to False.
"""
with ZipFile(self.source, "r") as kmz:
kmls = [
info.filename
for info in kmz.infolist()
if info.filename.endswith(".kml")
]
if "doc.kml" not in kmls:
raise FileNotFoundError(
f"Unable to parse file: {self.source}"
"Expected to find doc.kml inside KMZ file."
)
with kmz.open("doc.kml", "r") as kml_file:
self.source = io.BytesIO(kml_file.read())
self._init_from_kml(xml_schema, xml_extensions_schemas)
def _init_from_fit(self):
"""
Initialise GPX instance from FIT file.
"""
d = FitParser(self.source).parse()
self.gpx = d["gpx"]
self.xmlns = {}
self._ele_data = False
self._time_data = False
self._precisions = d["precisions"]
self._time_format = d["time_format"]
self._extensions_fields = None
def _init_from_csv(self):
"""
Initialise GPX instance from CSV file.
"""
self._init_from_dataframe(pl.read_csv(self.source))
def _init_from_dataframe(self, source: IntoFrameT):
"""
Initialize GPX instance from dataframe.
Args:
df (IntoFrameT): Dataframe with "lat", "lon" columns.
"""
df = nw.from_native(source)
if "time" in df.columns:
df.with_columns(nw.col("time").str.to_datetime())
# try:
# df.with_columns(nw.col("time").str.to_datetime())
# except:
# df.with_columns(nw.col("time").str.to_datetime(DEFAULT_TIME_FORMAT))
if "link" in df.columns:
def str_to_links(s: str) -> list[Link]:
if s.startswith("[") and s.endswith("]"):
s = s[1:-1]
return list(map(instance_from_str, split_attributes(s)))
df.with_columns(nw.col("link").map_batches(str_to_links))
trkpt = [
Wpt(
lat=row["lat"],
lon=row["lon"],
ele=row.get("ele"),
time=row.get("time"),
magvar=row.get("magvar"),
geoidheight=row.get("geoidheight"),
name=row.get("name"),
cmt=row.get("cmt"),
desc=row.get("desc"),
src=row.get("src"),
link=row.get("link"),
sym=row.get("sym"),
type=row.get("type"),
fix=row.get("fix"),
sat=row.get("sat"),
hdop=row.get("hdop"),
vdop=row.get("vdop"),
pdop=row.get("pdop"),
ageofdgpsdata=row.get("ageofdgpsdata"),
dgpsid=row.get("dgpsid"),
extensions=row.get("extensions"),
tag="trkpt",
)
for row in df.iter_rows(named=True)
]
trkseg = Trkseg(trkpt=trkpt)
trk = Trk(trkseg=[trkseg])
self.gpx = Gpx("1.1", "ezGPX", trk=[trk])
self.xmlns = {}
self._ele_data = "ele" in df.columns
self._time_data = "time" in df.columns
self._precisions = DEFAULT_PRECISION_DICT
self._time_format = DEFAULT_TIME_FORMAT
self._extensions_fields = None
###############################################################################
#### Schemas ##################################################################
###############################################################################
[docs]
def check_xml_schema(self) -> bool:
"""
Check XML schema.
Returns:
bool: True if the file follows XML schemas.
"""
return check_xml_schema(self.source, self.gpx.version)
[docs]
def check_xml_extensions_schemas(self) -> bool:
"""
Check XML extension schemas.
Returns:
bool: True if the file follows XML schemas.
"""
return check_xml_extensions_schemas(self.source)
###############################################################################
#### Metadata #################################################################
###############################################################################
# def get_file_name(self) -> str | None:
# """
# Return the name of the GPX file.
# """
# if self.gpx.metadata:
# return self.gpx.metadata.name
# return None
# def set_file_name(self, new_name: str) -> None:
# """
# Set the name of the GPX file.
# Args:
# new_name (str): New name of the GPX file.
# """
# if self.gpx.metadata:
# self.gpx.metadata.name = new_name
# self.gpx.metadata = Metadata(name=new_name)
###############################################################################
#### Points ###################################################################
###############################################################################
[docs]
def trkpt_count(self) -> int:
"""
Return the number of track points.
"""
n = 0
for track in self.gpx.trk:
for track_segment in track.trkseg:
n += len(track_segment.trkpt)
return n
[docs]
def trkpt_bounds(self) -> tuple[Latitude, Longitude, Latitude, Longitude]:
"""
Return minimum and maximum latitude and longitude.
Returns:
Tuple[Latitude, Longitude, Latitude, Longitude]: Min
latitude, min longitude, max latitude, max longitude.
"""
min_lat = self.gpx.trk[0].trkseg[0].trkpt[0].lat
min_lon = self.gpx.trk[0].trkseg[0].trkpt[0].lon
max_lat = min_lat
max_lon = min_lon
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
min_lat = min(min_lat, track_point.lat)
min_lon = min(min_lon, track_point.lon)
max_lat = max(max_lat, track_point.lat)
max_lon = max(max_lon, track_point.lon)
return min_lat, min_lon, max_lat, max_lon
[docs]
def trkpt_center(self) -> tuple[Latitude, Longitude]:
"""
Return latitude and longitude of the center point.
"""
min_lat, min_lon, max_lat, max_lon = self.trkpt_bounds()
center_lat = min_lat.value + (max_lat.value - min_lat.value) / 2
center_lon = min_lon.value + (max_lon.value - min_lon.value) / 2
return Latitude(center_lat), Longitude(center_lon)
[docs]
def trkpt_extreme(
self,
) -> tuple[Wpt, Wpt, Wpt, Wpt]:
"""
Return extreme points in track, i.e.: points with lowest and
highest latitude and longitude.
Returns:
tuple[Wpt, Wpt, Wpt, Wpt]: Min latitude
point, min longitude point, max latitude point, max
longitude points.
"""
min_lat_point = self.gpx.trk[0].trkseg[0].trkpt[0]
min_lon_point = self.gpx.trk[0].trkseg[0].trkpt[0]
max_lat_point = self.gpx.trk[0].trkseg[0].trkpt[0]
max_lon_point = self.gpx.trk[0].trkseg[0].trkpt[0]
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
if track_point.lat < min_lat_point.lat:
min_lat_point = track_point
if track_point.lon < min_lon_point.lon:
min_lon_point = track_point
if track_point.lat > max_lat_point.lat:
max_lat_point = track_point
if track_point.lon > max_lon_point.lon:
max_lon_point = track_point
return min_lat_point, min_lon_point, max_lat_point, max_lon_point
###############################################################################
#### Distance and Elevation ###################################################
###############################################################################
[docs]
def distance(self) -> float:
"""
Return the total distance of tracks (in meters).
"""
dst = 0.0
previous_point = self.gpx.trk[0].trkseg[0].trkpt[0]
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
dst += haversine_distance(previous_point, track_point)
previous_point = track_point
return dst
# from itertools import pairwise
# def distance_bis(self) -> float:
# dst = 0.0
# for track in self.gpx.trk:
# for track_segment in track.trkseg:
# for p1, p2 in self.pairwise(track_segment.trkpt):
# dst += haversine_distance(p1, p2)
# return dst
def _compute_distance_from_start(self):
"""
Return distance from start at each point.
"""
if self._dist_from_start:
return
dst = 0.0
previous_point = self.gpx.trk[0].trkseg[0].trkpt[0]
previous_point.distance_from_start = dst
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
dst += haversine_distance(previous_point, track_point)
track_point.distance_from_start = dst
previous_point = track_point
self._dist_from_start = True
[docs]
def ascent(self) -> float:
"""
Return the total ascent of tracks (in meters).
"""
ascent = 0
previous_elevation = self.gpx.trk[0].trkseg[0].trkpt[0].ele
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
if track_point.ele > previous_elevation:
ascent += track_point.ele - previous_elevation
previous_elevation = track_point.ele
return ascent
[docs]
def descent(self) -> float:
"""
Return the total descent of tracks (in meters).
"""
descent = 0
previous_elevation = self.gpx.trk[0].trkseg[0].trkpt[0].ele
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
if track_point.ele < previous_elevation:
descent += previous_elevation - track_point.ele
previous_elevation = track_point.ele
return descent
[docs]
def min_elevation(self) -> float:
"""
Returns the minimum elevation (in meters).
"""
min_elevation = self.gpx.trk[0].trkseg[0].trkpt[0].ele
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
min_elevation = min(min_elevation, track_point.ele)
return min_elevation
[docs]
def max_elevation(self) -> float:
"""
Returns the maximum elevation (in meters).
"""
max_elevation = self.gpx.trk[0].trkseg[0].trkpt[0].ele
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
max_elevation = max(max_elevation, track_point.ele)
return max_elevation
def _compute_ascent_rate(self) -> None:
"""
Compute ascent rate at each point.
"""
if self._ascent_rate:
return
previous_point = self.gpx.trk[0].trkseg[0].trkpt[0]
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
distance = haversine_distance(previous_point, track_point)
ascent = track_point.ele - previous_point.ele
try:
track_point.ascent_rate = (ascent * 100) / distance
except ZeroDivisionError:
track_point.ascent_rate = 0.0
previous_point = track_point
self._ascent_rate = True
[docs]
def max_descent_rate(self) -> float:
"""
Return the minimum ascent rate.
"""
self._compute_ascent_rate()
min_ascent_rate = 100.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
min_ascent_rate = min(min_ascent_rate, track_point.ascent_rate)
return min_ascent_rate
[docs]
def max_ascent_rate(self) -> float:
"""
Return the maximum ascent rate.
"""
self._compute_ascent_rate()
max_ascent_rate = -1.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
max_ascent_rate = max(max_ascent_rate, track_point.ascent_rate)
return max_ascent_rate
###############################################################################
#### Time #####################################################################
###############################################################################
def _compute_time_zone(self) -> str:
"""
Return the time zone based on geographic coordinates.
"""
self._time_zone = TimezoneFinder().timezone_at(
lat=self.gpx.trk[0].trkseg[0].trkpt[0].lat.value,
lng=self.gpx.trk[0].trkseg[0].trkpt[0].lon.value,
)
[docs]
def time_zone(self) -> str:
"""
Return the time zone based on geographic coordinates.
"""
if not self._time_zone:
self._compute_time_zone()
return self._time_zone
[docs]
def start_time(self, utc: bool = False) -> datetime | None:
"""
Return start time.
Args:
utc (bool, optional): Toggle UTC standard. Defaults to False.
Returns:
datetime | None: Start time or None if no time data.
"""
start_time = self.gpx.trk[0].trkseg[0].trkpt[0].time
if start_time and not utc:
start_time = start_time.astimezone(ZoneInfo(self.time_zone()))
return start_time
[docs]
def stop_time(self, utc: bool = False) -> datetime | None:
"""
Return stop time.
Args:
utc (bool, optional): Toggle UTC standard. Defaults to False.
Returns:
datetime | None: Start time or None if no time data.
"""
stop_time = self.gpx.trk[-1].trkseg[-1].trkpt[-1].time
if stop_time and not utc:
stop_time = stop_time.astimezone(ZoneInfo(self.time_zone()))
return stop_time
[docs]
def total_elapsed_time(self) -> datetime:
"""
Return the total elapsed time.
"""
total_elapsed_time = None
try:
total_elapsed_time = self.stop_time() - self.start_time()
except TypeError:
warnings.warn("Unable to compute activity total elapsed time")
return total_elapsed_time
[docs]
def stopped_time(self, tolerance: float = 2.45) -> datetime:
"""
Return the stopped time.
Args:
tolerance (float, optional): Maximal distance between two
points for movement. Defaults to 2.45.
Returns:
datetime: Stopped time.
"""
stopped_time = timedelta()
previous_point = self.gpx.trk[0].trkseg[0].trkpt[0]
for track in self.gpx.trk:
for segment in track.trkseg:
for point in segment.trkpt:
if haversine_distance(previous_point, point) < tolerance:
stopped_time += point.time - previous_point.time
previous_point = point
return stopped_time
[docs]
def moving_time(self) -> datetime:
"""
Return the moving time.
"""
return self.total_elapsed_time() - self.stopped_time()
###############################################################################
#### Speed and Pace ###########################################################
###############################################################################
def _compute_speed(self) -> None:
"""
Compute speed (in kilometers per hour) at each track point.
"""
if self._speed:
return
previous_point = self.gpx.trk[0].trkseg[0].trkpt[0]
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
distance = (
haversine_distance(previous_point, track_point) / 1000
) # Distance in kilometers
time = (
track_point.time - previous_point.time
).total_seconds() / 3600 # Time in hours
try:
track_point.speed = distance / time
except ZeroDivisionError:
track_point.speed = 0.0
previous_point = track_point
self._speed = True
[docs]
def avg_speed(self, moving: bool = False) -> float:
"""
Return the average speed (in kilometers per hour).
Args:
moving (bool, optional): Moving flag. Defaults to False.
Returns:
float: Average moving speed if `moving` is True, average
speed otherwise.
"""
distance = self.distance() / 1000 # Distance in kilometers
if moving:
time = self.moving_time().total_seconds() / 3600 # Moving time in hours
else:
time = (
self.total_elapsed_time().total_seconds() / 3600
) # Total elapsed time in hours
return distance / time
[docs]
def min_speed(self) -> float:
"""
Return the minimum speed (in kilometers per hour).
"""
self._compute_speed()
min_speed = 1_000.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
min_speed = min(min_speed, track_point.speed)
return min_speed
[docs]
def max_speed(self) -> float:
"""
Return the maximum speed (in kilometers per hour).
"""
self._compute_speed()
max_speed = -1.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
max_speed = max(max_speed, track_point.speed)
return max_speed
def _compute_pace(self) -> None:
"""
Compute pace (in minutes per kilometer) at each track point.
"""
if self._pace:
return
self._compute_speed()
for track in self.gpx.trk:
for segment in track.trkseg:
for point in segment.trkpt:
try:
point.pace = 60.0 / point.speed
except ZeroDivisionError:
point.pace = 0.0
self._pace = True
[docs]
def avg_pace(self, moving: bool = False) -> float:
"""
Return the average pace (in minutes per kilometer).
Args:
moving (bool, optional): Moving flag. Defaults to False.
Returns:
float: Average moving pace if `moving` is True, average
pace otherwise.
"""
return 60.0 / self.avg_speed(moving)
[docs]
def min_pace(self) -> float:
"""
Return the minimum pace (in minutes per kilometer).
"""
self._compute_pace()
min_pace = 1000.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
min_pace = min(min_pace, track_point.pace)
return min_pace
[docs]
def max_pace(self) -> float:
"""
Return the maximum pace (in minutes per kilometer).
"""
self._compute_pace()
max_pace = -1.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
max_pace = max(max_pace, track_point.pace)
return max_pace
def _compute_ascent_speed(self) -> None:
"""
Compute ascent speed (in kilometers per hour) at each track point.
"""
if self._ascent_speed:
return
previous_point = self.gpx.trk[0].trkseg[0].trkpt[0]
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
ascent = track_point.ele - previous_point.ele
# Convert to hours
time = (
track_point.time - previous_point.time
).total_seconds() / 3600
try:
track_point.ascent_speed = ascent / time
except ZeroDivisionError:
track_point.ascent_speed = 0.0
previous_point = track_point
self._ascent_speed = True
[docs]
def min_ascent_speed(self) -> float:
"""
Return the minimum ascent speed (in meters per hour).
"""
self._compute_ascent_speed()
min_ascent_speed = 1000.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
min_ascent_speed = min(min_ascent_speed, track_point.ascent_speed)
return min_ascent_speed
[docs]
def max_ascent_speed(self) -> float:
"""
Return the maximum ascent speed (in meters per hour).
"""
self._compute_ascent_speed()
max_ascent_speed = -1.0
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
max_ascent_speed = max(max_ascent_speed, track_point.ascent_speed)
return max_ascent_speed
###############################################################################
#### Data Removal #############################################################
###############################################################################
[docs]
def remove_elevation(self):
"""
Remove elevation data.
"""
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
track_point.ele = None
[docs]
def remove_time(self):
"""
Remove time data.
"""
for track in self.gpx.trk:
for track_segment in track.trkseg:
for track_point in track_segment.trkpt:
track_point.time = None
[docs]
def remove_extensions(self):
"""
Remove extensions data.
"""
self.gpx.extensions = None # Remove extensions from gpx
self.gpx.metadata.extensions = None # Remove extensions from metadata
# Remove extensions from waypoints
if self.gpx.wpt:
for wpt in self.gpx.wpt:
wpt.extensions = None
# Remove extensions from routes
if self.gpx.rte:
for rte in self.gpx.rte:
rte.extensions = None
# Remove extensions from tracks, track segments and track points
if self.gpx.trk:
for trk in self.gpx.trk:
trk.extensions = None
if trk.trkseg:
for trkseg in trk.trkseg:
trkseg.extensions = None
if trkseg.trkpt:
for trkpt in trkseg.trkpt:
trkpt.extensions = None
###############################################################################
#### Error Correction #########################################################
###############################################################################
[docs]
def remove_gps_errors(self, error_distance: float = 100) -> list:
"""
Remove GPS errors.
Args:
error_distance (float, optional): Error threshold distance
(in meters) between two points. Defaults to 100.
Returns:
list: List of removed points (GPS errors).
"""
previous_point = None
gps_errors = []
for track in self.gpx.trk:
for track_segment in track.trkseg:
new_trkpt = []
for track_point in track_segment.trkpt:
# GPS error
dst = haversine_distance(previous_point, track_point)
if (
previous_point is not None and dst > error_distance
): # TODO use Z-score?
gps_errors.append(track_point)
# No GPS error
else:
new_trkpt.append(track_point)
previous_point = track_point
track_segment.trkpt = new_trkpt
return gps_errors
###############################################################################
#### Simplification ###########################################################
###############################################################################
[docs]
def remove_points(self, reduction_factor: int = 2):
"""
Remove track points naively, i.e.: keep 1 point for every
`reduction_factor` points.
Note: For a more advanced processing, consider using the
`simplify` method.
Args:
reduction_factor (int, optional): Reduction factor.
The number of points will be divided by this value.
Defaults to 2.
"""
for trk in self.gpx.trk:
for trkseg in trk.trkseg:
trkseg.trkpt = [
p
for i, p in enumerate(trkseg.trkpt, 1)
if i % reduction_factor == 0
]
[docs]
def simplify(self, tolerance: float = 2):
"""
Simplify tracks using Ramer-Douglas-Peucker algorithm.
Args:
tolerance (float, optional): Minimum distance (in meters)
between the point and the track if the point were removed.
Defaults to 2.
"""
epsilon = degrees(tolerance / EARTH_RADIUS)
for track in self.gpx.trk:
for segment in track.trkseg:
segment.trkpt = ramer_douglas_peucker(segment.trkpt, epsilon)
###############################################################################
#### Reverse ##################################################################
###############################################################################
[docs]
def reverse(self):
"""
Reverse GPX.
"""
self.gpx.trk.reverse()
for trk in self.gpx.trk:
trk.trkseg.reverse()
for trkseg in trk.trkseg:
trkseg.trkpt.reverse()
###############################################################################
#### Merge ####################################################################
###############################################################################
[docs]
@staticmethod
def merge(*gpxs: GPX) -> GPX:
"""
Merge GPX objects into a new instance.
Returns:
GPX: GPX: Merged GPX (new instance).
"""
new_gpx = GPX() # New GPX instance
new_gpx.xmlns = {"": "http://www.topografix.com/GPX/1/1"}
new_gpx.xsi_schema_location = {
"http://www.topografix.com/GPX/1/1": "http://www.topografix.com/GPX/1/1/gpx.xsd"
}
new_gpx._extensions_fields = {}
new_gpx.gpx.version = "1.1"
new_gpx.gpx.creator = "ezGPX"
new_gpx.gpx.wpt = []
new_gpx.gpx.rte = []
new_gpx.gpx.trk = []
new_extensions = {}
for gpx in gpxs:
new_gpx.xmlns |= gpx.xmlns
new_gpx.xsi_schema_location |= gpx.xsi_schema_location
new_gpx._extensions_fields |= gpx._extensions_fields
# new_gpx.gpx.metadata = (
# new_gpx.gpx.metadata if new_gpx.gpx.metadata else gpx.gpx.metadata
# ) # TODO how to merge metadata?
if gpx.gpx.wpt:
new_gpx.gpx.wpt.extend(gpx.gpx.wpt)
if gpx.gpx.rte:
new_gpx.gpx.rte.extend(gpx.gpx.rte)
if gpx.gpx.trk:
new_gpx.gpx.trk.extend(gpx.gpx.trk)
if gpx.gpx.extensions:
new_extensions |= gpx.gpx.extensions.values
new_gpx.gpx.extensions = Extensions(new_extensions)
return new_gpx
###############################################################################
#### Exports ##################################################################
###############################################################################
[docs]
def to_dict(
self,
values: list[str] = None,
as_series: bool = False,
) -> dict:
"""
Convert GPX object to dictionary (similar to Polars
`to_dict`).
Args:
values (list[str], optional): List of values to write.
Supported values: "lat", "lon", "ele", "time", "speed",
"pace", "ascent_rate", "ascent_speed",
"distance_from_start". Defaults to None.
as_series (bool, optional): True -> Values are Series,
False -> Values are list[Any]. Defaults to False.
Returns:
dict: Dictionary representing the GPX.
"""
return self.to_polars(values).to_dict(as_series=as_series)
[docs]
def to_dicts(
self,
values: list[str] = None,
) -> list[dict[str, Any]]:
"""
Convert GPX object to list of dictionaries (similar to Polars
`to_dicts`).
Args:
values (list[str], optional): List of values to write.
Supported values: "lat", "lon", "ele", "time", "speed",
"pace", "ascent_rate", "ascent_speed",
"distance_from_start". Defaults to None.
Returns:
list[dict[str, Any]]: List of dictionaries representing the GPX.
"""
return self.to_polars(values).to_dicts()
def _to_dict_df(self, values: list[str] = None) -> dict:
"""
Convert GPX object to dictionary.
Args:
values (list[str], optional): List of values to write.
Supported values: "lat", "lon", "ele", "time", "speed",
"pace", "ascent_rate", "ascent_speed",
"distance_from_start". Defaults to None.
Returns:
dict: Dictionary containing data from GPX.
"""
# Set default parameter
if values is None:
values = ["lat", "lon"]
# Compute required values
test_point = self.gpx.trk[0].trkseg[0].trkpt[0]
if "speed" in values and test_point.speed is None:
self._compute_speed()
if "pace" in values and test_point.pace is None:
self._compute_pace()
if "ascent_rate" in values and test_point.ascent_rate is None:
self._compute_ascent_rate()
if "ascent_speed" in values and test_point.ascent_speed is None:
self._compute_ascent_speed()
if "distance_from_start" in values and test_point.distance_from_start is None:
self._compute_distance_from_start()
# Create dataframe
gpx_data = {}
for v in values:
if v in ["lat", "lon", "magvar", "fix", "dgpsid"]:
gpx_data[v] = [
getattr(trkpt, v).value
for trk in self.gpx.trk
for trkseg in trk.trkseg
for trkpt in trkseg.trkpt
]
elif v == "link":
gpx_data["link"] = [
", ".join(map(str, trkpt.link))
for trk in self.gpx.trk
for trkseg in trk.trkseg
for trkpt in trkseg.trkpt
]
else:
gpx_data[v] = [
getattr(trkpt, v)
for trk in self.gpx.trk
for trkseg in trk.trkseg
for trkpt in trkseg.trkpt
]
return gpx_data
[docs]
def to_pandas(self, values: list[str] = None) -> pd.DataFrame:
"""
Convert GPX object to Pandas Dataframe.
Missing values are filled with default values (0 for numerical
values and empty string for text).
Args:
values (list[str], optional): List of values to write.
Supported values: "lat", "lon", "ele", "time", "speed",
"pace", "ascent_rate", "ascent_speed",
"distance_from_start". Defaults to None.
Returns:
pd.DataFrame: Dataframe containing data from GPX.
"""
# Set default parameter
if values is None:
values = ["lat", "lon"]
# Disable time related values if no time data available
if not self._time_data:
if any(v in GPX.TIME_VALUES for v in values):
warnings.warn(
f"""Trying to create dataframe from GPX file {self.source}
which does not contain time data. Time related values
(time, speed, pace, ascent speed) will not be present in
the dataframe.""",
UserWarning,
)
for v in GPX.TIME_VALUES:
if v in values:
values.remove(v)
# Disable elevation related values if no elevation data available
if not self._ele_data:
if any(v in GPX.ELEVATION_VALUES for v in values):
warnings.warn(
f"""Trying to create dataframe from GPX file {self.source}
which does not contain elevation data. Time related
values (elevation, ascent rate, ascent speed) will not
be present in the dataframe.""",
UserWarning,
)
for v in GPX.ELEVATION_VALUES:
if v in values:
values.remove(v)
return pd.DataFrame(self._to_dict_df(values))
[docs]
def to_polars(self, values: list[str] = None) -> pl.DataFrame:
"""
Convert GPX object to Polars Dataframe.
Missing values are filled with default values (0 for numerical
values and empty string for text).
Args:
values (list[str], optional): List of values to write.
Supported values: "lat", "lon", "ele", "time", "speed",
"pace", "ascent_rate", "ascent_speed",
"distance_from_start". Defaults to None.
Returns:
pl.DataFrame: Dataframe containing data from GPX.
"""
# Set default parameter
if values is None:
values = ["lat", "lon"]
# Disable time related values if no time data available
if not self._time_data:
if any(v in GPX.TIME_VALUES for v in values):
warnings.warn(
f"""Trying to create dataframe from GPX file {self.source}
which does not contain time data. Time related values
(time, speed, pace, ascent speed) will not be present in
the dataframe.""",
UserWarning,
)
for v in GPX.TIME_VALUES:
if v in values:
values.remove(v)
# Disable elevation related values if no elevation data available
if not self._ele_data:
if any(v in GPX.ELEVATION_VALUES for v in values):
warnings.warn(
f"""Trying to create dataframe from GPX file {self.source}
which does not contain elevation data. Time related
values (elevation, ascent rate, ascent speed) will not
be present in the dataframe.""",
UserWarning,
)
for v in GPX.ELEVATION_VALUES:
if v in values:
values.remove(v)
return pl.DataFrame(self._to_dict_df(values))
[docs]
def to_csv(
self,
dest: Optional[str | Path | IO[str] | IO[bytes] | bytes] = None,
values: list[str] = None,
**kwargs,
) -> str | None:
"""
Write the GPX object track coordinates to a CSV file.
Args:
dest (str | Path | IO[str] | IO[bytes] | bytes, optional):
Path to a file or a file-like object to write in.
Defaults to None.
values (list[str], optional): list of values to write.
Supported values: "lat", "lon", "ele", "time", "speed",
"pace", "ascent_rate", "ascent_speed",
"distance_from_start". Defaults to None.
Returns:
str | None: CSV like string if path is set to None.
"""
if values is None:
values = ["lat", "lon"]
if isinstance(dest, bytes):
dest = io.BytesIO(dest)
# Argument columns is required for KML writer (keep values order)
return (
self.to_polars(values)
.select(values)
.write_csv(dest, datetime_format=self._time_format, **kwargs)
)
[docs]
def to_gpx(
self,
dest: Optional[str | Path | IO[str] | IO[bytes] | bytes] = None,
*,
bounds_fields: Optional[list[str]] = None,
copyright_fields: Optional[list[str]] = None,
email_fields: Optional[list[str]] = None,
extensions_fields: Optional[dict] = None,
gpx_fields: Optional[list[str]] = None,
link_fields: Optional[list[str]] = None,
metadata_fields: Optional[list[str]] = None,
person_fields: Optional[list[str]] = None,
ptseg_fields: Optional[list[str]] = None,
pt_fields: Optional[list[str]] = None,
rte_fields: Optional[list[str]] = None,
trkseg_fields: Optional[list[str]] = None,
trk_fields: Optional[list[str]] = None,
wpt_fields: Optional[list[str]] = None,
trkpt_fields: Optional[list[str]] = None,
mandatory_fields: bool = True,
) -> str | None:
"""
Write the GPX object to a GPX file.
Args:
dest (str | Path | IO[str] | IO[bytes] | bytes, optional):
Path to a file or a file-like object to write in.
Defaults to None.
bounds_fields (Optional[list[str]], optional): _description_. Defaults to None.
copyright_fields (Optional[list[str]], optional): _description_. Defaults to None.
email_fields (Optional[list[str]], optional): _description_. Defaults to None.
extensions_fields (Optional[dict], optional): _description_. Defaults to None.
gpx_fields (Optional[list[str]], optional): _description_. Defaults to None.
link_fields (Optional[list[str]], optional): _description_. Defaults to None.
metadata_fields (Optional[list[str]], optional): _description_. Defaults to None.
person_fields (Optional[list[str]], optional): _description_. Defaults to None.
ptseg_fields (Optional[list[str]], optional): _description_. Defaults to None.
pt_fields (Optional[list[str]], optional): _description_. Defaults to None.
rte_fields (Optional[list[str]], optional): _description_. Defaults to None.
trkseg_fields (Optional[list[str]], optional): _description_. Defaults to None.
trk_fields (Optional[list[str]], optional): _description_. Defaults to None.
wpt_fields (Optional[list[str]], optional): _description_. Defaults to None.
trkpt_fields (Optional[list[str]], optional): _description_. Defaults to None.
mandatory_fields (bool, optional): _description_. Defaults to True.
Returns:
str | None: GPX like string if path is set to None.
"""
return GPXWriter(self).write(
file_path=dest,
bounds_fields=bounds_fields,
copyright_fields=copyright_fields,
email_fields=email_fields,
extensions_fields=(
extensions_fields if extensions_fields else self._extensions_fields
),
gpx_fields=gpx_fields,
link_fields=link_fields,
metadata_fields=metadata_fields,
person_fields=person_fields,
ptseg_fields=ptseg_fields,
pt_fields=pt_fields,
rte_fields=rte_fields,
trkseg_fields=trkseg_fields,
trk_fields=trk_fields,
wpt_fields=wpt_fields,
trkpt_fields=trkpt_fields,
mandatory_fields=mandatory_fields,
)
[docs]
def to_kml(
self,
dest: Optional[str | Path | IO[str] | IO[bytes] | bytes] = None,
*,
styles: Optional[list[tuple[str, dict]]] = None,
) -> str | None:
"""pt
Write the GPX object to a KML file.
Args:
dest (str | Path | IO[str] | IO[bytes] | bytes, optional):
Path to a file or a file-like object to write in.
Defaults to None.
styles (Optional[list[tuple[str, dict]]], optional): KML
styles. Defaults to None.
Returns:
str | None: KML like string if path is set to None.
"""
return KMLWriter(self).write(dest, styles)