Привет! Сегодня хочу поделиться интересным проектом, который мы сделали для конкурса. Задача — превратить сырые GPX-треки (треки с GPS-устройств) в структурированные данные с визуализацией, метеорологической и географической аналитикой. Всё это — на Python, с использованием открытых API и библиотек для работы с геоданными.
Что делает проект?
Код представляет собой пайплайн обработки GPS-треков, который:
-
Скачивает GPX-файлы по ссылкам
-
Визуализирует треки на карте
-
Извлекает данные о каждой точке (координаты, время, высота)
-
Добавляет погодные данные (температуру) в момент записи трека
-
Определяет регион и тип местности
-
Рассчитывает частоту шагов (для пеших походов)
-
Аугментирует изображения треков для расширения датасета
-
Визуализирует распределения данных
Загрузка и парсинг GPX
import requests
import pandas as pd
import os
def download_gpx(links):
"""
Скачивает GPX-файлы по списку ссылок
"""
for num, url in enumerate(links):
try:
response = requests.get(url)
filename = f"track{num}.gpx"
with open(f"data/gpx/{filename}", mode="wb") as f:
f.write(response.content)
except Exception as e:
print("Ошибка при скачивании")
# Инициализация DataFrame для хранения всех точек
df = pd.DataFrame(columns=["track_id", "track_time", "latitude", "longitude", "altitude"])
Визуализация треков на картах
import gpxpy
import geopandas as gpd
import contextily as ctx
from shapely.geometry import box, LineString
import matplotlib.pyplot as plt
def gpx_to_png(df):
"""
Конвертирует GPX-файлы в изображения карт с треками
"""
margin = 0.02
img_path = "data/image"
os.makedirs(img_path, exist_ok=True)
for i in os.listdir("data/gpx"):
png_name = f"{i[:-4]}.png"
with open(f"data/gpx/{i}", encoding="UTF-8") as f:
gpx = gpxpy.parse(f)
lats, lons = [], []
for track in gpx.tracks:
for segment in track.segments:
for point in segment.points:
lats.append(point.latitude)
lons.append(point.longitude)
df.loc[len(df)] = [i, point.time, point.latitude, point.longitude, point.elevation]
# Создаем bounding box с отступами
bbox = box(
min(lons) - margin,
min(lats) - margin,
max(lons) + margin,
max(lats) + margin
)
track_line = LineString(zip(lons, lats))
# Конвертируем в Web Mercator для отображения
gdf_bbox = gpd.GeoDataFrame(geometry=[bbox], crs="EPSG:4326")
gdf_bbox_web = gdf_bbox.to_crs(epsg=3857)
gdf_track = gpd.GeoDataFrame(geometry=[track_line], crs="EPSG:4326")
gdf_track_web = gdf_track.to_crs(epsg=3857)
# Создаем карту
fig, ax = plt.subplots(figsize=(10, 8))
gdf_bbox_web.plot(ax=ax, alpha=0)
gdf_track_web.plot(ax=ax, color="red", linewidth=2)
ctx.add_basemap(ax, crs=gdf_bbox_web.crs, source=ctx.providers.OpenStreetMap.Mapnik)
ax.set_axis_off()
plt.savefig(f"{img_path}/{png_name}", dpi=150, bbox_inches="tight", pad_inches=0)
plt.close(fig)
return df
Получение исторических погодных данных
def temp(lat, lon, date):
"""
Получает температуру для заданных координат и даты
"""
url = "https://archive-api.open-meteo.com/v1/archive"
params = {
'latitude': lat, # Широта
'longitude': lon, # Долгота
'start_date': date,
'end_date': date,
'hourly': 'temperature_2m',
"timezone":"auto"
}
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'}
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
if response.status_code == 200:
data = response.json()
return data["hourly"]['temperature_2m'][12] # Температура в полдень
else:
print(f"Данные не получены")
def analysis_weather(df):
'''
Функция для интерполяции температур между опорными точками.
Использует 5 опорных точек для интерполяции температуры для всех строк.
'''
n = len(df)
key_indexes = [0, n//4, n//2, 3*n//4, n-1]
temperatures_at_key_points = {}
for idx in key_indexes:
lat = df.iloc[idx]["latitude"]
lon = df.iloc[idx]["longitude"]
date = df.iloc[idx]["track_time"]
temp_value = temp(lat, lon, date)
temperatures_at_key_points[idx] = temp_value
if len(temperatures_at_key_points) < 2:
print("Недостаточно данных для интерполяции, проверьте работоспособность API")
df["temperature"] = None
return df
all_temperatures = []
left_idx = 0
right_idx = 1
for i in range(n):
if (right_idx < len(key_indexes) - 1 and i >= key_indexes[right_idx]):
left_idx += 1
right_idx += 1
left_key = key_indexes[left_idx]
right_key = key_indexes[right_idx]
left_temp = temperatures_at_key_points[left_key]
right_temp = temperatures_at_key_points[right_key]
if i in temperatures_at_key_points:
temperature = temperatures_at_key_points[i]
elif left_temp is not None and right_temp is not None:
temperature = left_temp + (right_temp - left_temp) * (i - left_key) / (right_key - left_key)
else:
temperature = left_temp if left_temp is not None else right_temp
all_temperatures.append(temperature)
df = df.copy()
df["temperature"] = all_temperatures
return df
def get_temp(df):
"""
Обрабатывает температуру для всех треков
"""
try:
df_temp = pd.DataFrame()
for i in range(0, 3):
track_data = df[df["track_id"] == f"track{i}.gpx"]
track_data_weather = analysis_weather(track_data)
df_temp = pd.concat([df_temp, track_data_weather])
print(f"track{i} добавлен")
df = df_temp.copy()
return df
except Exception as e:
print(f"Ошибка вызова функции: analysis_weather {e}")
Географический анализ: регион и тип местности
import time
def extract_map_region(lat: float, lon: float):
"""
Определяет регион по координатам через Nominatim API
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/536.36 (KHTML, like Gecko) Chrome/58.0.3029.100 Safari/536.3'}
response = requests.get(f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json", headers=headers)
json = response.json()
time.sleep(1.5) # Уважаем лимиты API
if "county" in json["address"]:
return json["address"]["county"]
if "state" in json["address"]:
return json["address"]["state"]
return json["address"]["country"]
except Exception as e:
print(f"Error {e}")
def analysis_region(df):
'''
Функция для определения регионов между опорными точками.
'''
try:
lat = df.iloc[0]["latitude"]
lon = df.iloc[0]["longitude"]
df = df.copy()
df["region"] = extract_map_region(lat, lon)
return df
except Exception as e:
print(f"Ошибка вызова функции extract_map_region {e}")
def terrain_type(df):
"""
Определяет тип местности и ключевые объекты через Overpass API
"""
overpass_endpoints = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.openstreetmap.ru/cgi/interpreter"
]
try:
points = list(zip(df["latitude"], df["longitude"]))
rep_points = [points[0], points[len(points)//2], points[-1]] # 3 опорные точки
all_landuse, all_natural, all_key_objects = [], [], set()
for lat, lon in rep_points:
overpass_query = f"""
[out:json][timeout:45];
(
way(around:500,{lat},{lon})["landuse"];
way(around:500,{lat},{lon})["natural"];
way(around:500,{lat},{lon})["leisure"];
way(around:500,{lat},{lon})["waterway"="river"];
way(around:500,{lat},{lon})["waterway"="stream"];
way(around:500,{lat},{lon})["natural"="water"];
node(around:500,{lat},{lon})["place"="city"];
node(around:500,{lat},{lon})["place"="town"];
node(around:500,{lat},{lon})["place"="village"];
node(around:500,{lat},{lon})["natural"="peak"];
node(around:500,{lat},{lon})["natural"="mountain"];
);
out tags center;
"""
for endpoint in overpass_endpoints:
response = requests.get(endpoint, params={'data': overpass_query}, timeout=60)
if response.status_code == 200:
data = response.json()
break
for element in data.get('elements', []):
tags = element.get('tags', {})
if 'landuse' in tags:
all_landuse.append(tags['landuse'])
elif 'natural' in tags:
all_natural.append(tags['natural'])
if tags['natural'] in ['peak', 'mountain'] and 'name' in tags:
all_key_objects.add(f"Mountain: {tags['name']}")
if 'waterway' in tags and tags['waterway'] in ['river', 'stream'] and 'name' in tags:
all_key_objects.add(f"River: {tags['name']}")
if 'place' in tags and tags['place'] in ['city', 'town', 'village'] and 'name' in tags:
all_key_objects.add(f"Settlement: {tags['name']} ({tags['place']})")
if element.get('type') == 'way' and 'natural' in tags and tags['natural'] == 'water' and 'name' in tags:
all_key_objects.add(f"Lake: {tags['name']}")
time.sleep(1.5)
terrain_type = "unknown"
if all_landuse:
terrain_type = max(set(all_landuse), key=all_landuse.count)
elif all_natural:
terrain_type = max(set(all_natural), key=all_natural.count)
key_objects_str = "; ".join(sorted(all_key_objects)) if all_key_objects else None
df["terrain_type"] = terrain_type
df["key_objects_str"] = key_objects_str
except Exception as e:
print("Ошибка", e)
return df
Расчет физических параметров
from geopy.distance import geodesic
def step_frequency(df):
"""
Рассчитывает частоту шагов на основе расстояния между точками
"""
points = list(zip(df["latitude"], df["longitude"]))
step = [0]
for p1, p2 in zip(points, points[1:]):
dist = geodesic(p1, p2).meters
step.append(dist / 0.75) # Предполагаем среднюю длину шага 0.75 метра
df["steps"] = step
return df
Аугментация изображений
from PIL import Image, ImageEnhance
from random import randint, uniform
def data_augmentation():
"""
Создает аугментированные версии изображений треков
"""
images_path = "data/image"
os.makedirs(images_path, exist_ok=True)
for filename in os.listdir(images_path):
if filename.lower().endswith(".png") and not any(word in filename for word in ["rotated", "contrasted", "brightness"]):
img_path = os.path.join(images_path, filename)
img = Image.open(img_path)
base_name = os.path.splitext(filename)[0]
# Поворот
rotated_img = img.rotate(randint(10, 60))
rotated_img.save(os.path.join(images_path, f"{base_name}_rotated.png"))
# Изменение контраста
contrasted_img = ImageEnhance.Contrast(img).enhance(randint(2, 4))
contrasted_img.save(os.path.join(images_path, f"{base_name}_contrasted.png"))
# Изменение яркости
brightness_img = ImageEnhance.Brightness(img).enhance(uniform(1.2, 1.6))
brightness_img.save(os.path.join(images_path, f"{base_name}_brightness.png"))
Визуализация распределений
import seaborn as sns
def norm_or_not(df):
"""
Визуализирует распределения всех числовых колонок
"""
n = len(df.columns) // 5 + bool(len(df.columns) % 5)
_, axes = plt.subplots(nrows=n, ncols=5, figsize=(15,20))
for idx, i in enumerate(df.columns):
sns.kdeplot(data=df, x=i, common_norm=False, ax=axes[idx // 5][idx%5])
plt.title(i)
plt.show()
Полный пайплайн обработки
# Пример использования всего пайплайна
def main():
# 1. Скачиваем треки
links = [
"https://example.com/track1.gpx",
"https://example.com/track2.gpx",
"https://example.com/track3.gpx"
]
download_gpx(links)
# 2. Конвертируем в изображения и DataFrame
df = gpx_to_png(df)
# 3. Форматируем даты
df['track_time'] = df['track_time'].dt.strftime('%Y-%m-%d')
# 4. Получаем погодные данные
df = get_temp(df)
# 5. Определяем регион
df = analysis_region(df)
# 6. Рассчитываем частоту шагов
df = step_frequency(df)
# 7. Определяем тип местности
df = terrain_type(df)
# 8. Визуализируем распределения
norm_or_not(df)
# 9. Аугментируем изображения
data_augmentation()
print("Обработка завершена!")
print(f"Обработано {len(df)} точек")
print(f"Сохранено треков: {len(os.listdir('data/gpx'))}")
return df
if __name__ == "__main__":
result_df = main()
Заключение
Этот проект демонстрирует, как можно превратить сырые GPS-данные в богатый источник аналитики. Комбинация географического, метеорологического и статистического анализа позволяет извлечь максимум информации из, казалось бы, простых треков.
Код модульный и легко расширяемый — можно добавлять новые источники данных, метрики и виды анализа. И все это — на открытых данных и бесплатных API.
Требования: Python 3.8+, установка зависимостей:
Важно: Все используемые API бесплатны для некоммерческого использования, но имеют лимиты запросов.
Автор: bald_from_bra33ers
