tt/main.py
Mohamad.Elsena 5dc71dbf90 weeee💃
2025-02-10 09:48:27 +01:00

90 lines
3.6 KiB
Python

import requests
import uuid
import time
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from database import get_db_connection, create_tables
from config import DB_API_KEY, DB_API_URL, WEATHER_API_KEY, WEATHER_API_URL, STATION_IDS
# Initialize scheduler
scheduler = BlockingScheduler()
def fetch_train_schedules():
"""Fetch train schedules from DB API."""
for station_name, station_id in STATION_IDS.items():
url = f"{DB_API_URL}/departureBoard?station={station_id}"
headers = {"Authorization": f"Bearer {DB_API_KEY}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
schedules = response.json()
store_schedules(schedules, station_id)
else:
print(f"Failed to fetch schedules for {station_name}: {response.status_code}")
def store_schedules(schedules, station_id):
"""Store schedules in the database."""
conn = get_db_connection()
cur = conn.cursor()
for schedule in schedules:
cur.execute(
"""
INSERT INTO schedules (
schedule_id, train_id, route_id, departure_station, arrival_station,
scheduled_departure, scheduled_arrival, platform_departure, platform_arrival,
distance_km, scheduled_stops, service_type
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
str(uuid.uuid4()), schedule.get("train_id"), schedule.get("route_id"),
station_id, schedule.get("arrival_station"),
schedule.get("scheduled_departure"), schedule.get("scheduled_arrival"),
schedule.get("platform_departure"), schedule.get("platform_arrival"),
schedule.get("distance_km"), schedule.get("scheduled_stops"),
schedule.get("service_type")
)
)
conn.commit()
cur.close()
conn.close()
def fetch_weather_data():
"""Fetch weather data for each station."""
for station_name, station_id in STATION_IDS.items():
url = f"{WEATHER_API_URL}?lat={STATION_LATITUDE}&lon={STATION_LONGITUDE}&appid={WEATHER_API_KEY}"
response = requests.get(url)
if response.status_code == 200:
weather = response.json()
store_weather_data(weather, station_id)
else:
print(f"Failed to fetch weather for {station_name}: {response.status_code}")
def store_weather_data(weather, station_id):
"""Store weather data in the database."""
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"""
INSERT INTO weather_history (
weather_id, station_id, timestamp, temperature, precipitation,
wind_speed, wind_direction, humidity, pressure, visibility, weather_condition
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
str(uuid.uuid4()), station_id, datetime.now(), weather.get("main", {}).get("temp"),
weather.get("rain", {}).get("1h", 0), weather.get("wind", {}).get("speed"),
weather.get("wind", {}).get("deg"), weather.get("main", {}).get("humidity"),
weather.get("main", {}).get("pressure"), weather.get("visibility"),
weather.get("weather", [{}])[0].get("main")
)
)
conn.commit()
cur.close()
conn.close()
# Schedule tasks
scheduler.add_job(fetch_train_schedules, 'interval', minutes=10)
scheduler.add_job(fetch_weather_data, 'interval', hours=1)
if __name__ == "__main__":
create_tables()
scheduler.start()