seattlecitylight-mastodon-bot/scl.py

357 lines
14 KiB
Python

import io
import math
from datetime import datetime
from typing import Optional
import requests
import yaml
from mastodon import Mastodon
from PIL import Image, ImageDraw, ImageFont
from sqlalchemy import create_engine, select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
from staticmap import Polygon, StaticMap
post_datetime_format = "%b %e %l:%M %p"
scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events"
scl_events_response = requests.get(scl_events_url)
try:
scl_events = scl_events_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from SCL API")
raise
config = yaml.safe_load(open("config.yml"))
stadiamaps_api_key = config["stadiamaps"]["api_key"]
nominatim_url = config["nominatim"]["api_base_url"]
mastodon = Mastodon(
client_id=config["mastodon"]["client_id"],
client_secret=config["mastodon"]["client_secret"],
access_token=config["mastodon"]["access_token"],
api_base_url=config["mastodon"]["api_base_url"],
)
class AttribStaticMap(StaticMap, object):
def __init__(self, *args, **kwargs):
self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap"
super(AttribStaticMap, self).__init__(*args, **kwargs)
def _draw_features(self, image):
super(AttribStaticMap, self)._draw_features(image)
txt = Image.new("RGBA", image.size, (255, 255, 255, 0))
# get a font
# fnt = ImageFont.truetype('FreeMono.ttf', 12)
fnt = ImageFont.load_default()
# get a drawing context
d = ImageDraw.Draw(txt)
textSize = fnt.getbbox(self.attribution)
textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3])
offset = 2
options = {"fill": (255, 255, 255, 180)}
d.rectangle(
[
(textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)),
(
textSize[2] + textPosition[0] + (2 * offset),
textSize[3] + textPosition[1] + (2 * offset),
),
],
**options,
)
# draw text, full opacity
d.text(
(textPosition[0] - offset, textPosition[1] - offset),
self.attribution,
font=fnt,
fill="black",
)
image.paste(txt, (0, 0), txt)
class Base(DeclarativeBase):
pass
class SclOutage(Base):
__tablename__ = "scl_outages"
scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True)
outage_user_id: Mapped[str] = mapped_column()
most_recent_post_id: Mapped[Optional[str]] = mapped_column()
last_updated_time: Mapped[datetime] = mapped_column()
estimated_restoration_time: Mapped[datetime] = mapped_column()
cause: Mapped[str] = mapped_column()
outage_size: Mapped[str] = mapped_column()
status: Mapped[Optional[str]] = mapped_column()
no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column()
def __repr__(self) -> str:
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r})"
engine = create_engine("sqlite:///scl.db")
Base.metadata.create_all(engine)
with Session(engine) as session:
for event in scl_events:
print("Processing outage with Internal ID {}".format(event["id"]))
start_time = datetime.fromtimestamp(event["startTime"] / 1000)
last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000)
estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000)
lookup_statement = select(SclOutage).where(
SclOutage.scl_outage_id == event["id"]
)
lookup_result = session.scalars(lookup_statement)
if event["numPeople"] < 250:
outage_size = "Small"
outage_color = "#F97316"
elif event["numPeople"] < 1000:
outage_size = "Medium"
outage_color = "#EF4444"
else:
outage_size = "Large"
outage_color = "991B1B"
if "status" in event:
status = event["status"]
else:
status = None
hashtag_string = "#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format(
event["identifier"]
)
try:
existing_record = lookup_result.one()
updated_properties = []
updated_entries = []
if estimated_restoration_time != existing_record.estimated_restoration_time:
existing_record.estimated_restoration_time = estimated_restoration_time
updated_properties.append("estimated restoration")
updated_entries.append(
"Est. Restoration: {}".format(
estimated_restoration_time.strftime(post_datetime_format)
)
)
if event["cause"] != existing_record.cause:
existing_record.cause = event["cause"]
updated_properties.append("cause")
updated_entries.append("Cause: {}".format(event["cause"]))
if outage_size != existing_record.outage_size:
existing_record.outage_size = outage_size
updated_properties.append("outage size")
updated_entries.append("Outage Size: {}".format(outage_size))
if status != existing_record.status:
existing_record.status = status
updated_properties.append("status")
updated_entries.append("Status: {}".format(status))
if updated_properties:
updated_properties.sort()
updated_entries.sort()
if len(updated_properties) == 1:
updated_entries.insert(
0,
"The {} of this outage has been updated.\n".format(
updated_properties[0]
),
)
else:
# TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence
updated_entries.insert(
0,
"The {} of this outage have been updated.\n".format(
", ".join(updated_properties)
),
)
updated_entries.append("")
updated_entries.append(hashtag_string)
mastodon_post_result = mastodon.status_post(
status="\n".join(updated_entries),
in_reply_to_id=existing_record.most_recent_post_id,
visibility="public",
language="en",
)
existing_record.most_recent_post_id = mastodon_post_result["id"]
session.commit()
except NoResultFound:
if outage_size == "Small":
# If the outage becomes medium/large, it'll then be posted as a new outage on the next run
print("Outage is small, will not post")
continue
print("Existing record not found")
try:
map = AttribStaticMap(
512,
512,
url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}.png?api_key="
+ stadiamaps_api_key,
)
assert event["polygons"]["type"] == "polygon"
for ring in event["polygons"]["rings"]:
polygon = Polygon(
ring, "{}7F".format(outage_color), outage_color, simplify=True
)
map.add_polygon(polygon)
map_image = map.render()
try:
def num2deg(xtile, ytile, zoom):
n = 1 << zoom
lon_deg = xtile / n * 360.0 - 180.0
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
lat_deg = math.degrees(lat_rad)
return lat_deg, lon_deg
center_lat_lon = num2deg(map.x_center, map.y_center, map.zoom)
# Zoom level 17 ensures that we won't get any building/POI names, just street names
geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format(
nominatim_url=nominatim_url,
lat=center_lat_lon[0],
lon=center_lat_lon[1],
)
geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"}
geocode_response = requests.get(
geocode_url, headers=geocode_headers
)
try:
geocode = geocode_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from nominatim API")
raise
if (
geocode["features"][0]["properties"]["geocoding"]["city"]
!= "Seattle"
):
city_not_seattle_text = " of {}".format(
geocode["features"][0]["properties"]["geocoding"]["city"]
)
else:
city_not_seattle_text = ""
street = geocode["features"][0]["properties"]["geocoding"]["name"]
if "locality" in geocode["features"][0]["properties"]["geocoding"]:
locality = geocode["features"][0]["properties"]["geocoding"]
if locality == "Uptown":
locality = "Lower Queen Anne"
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
street,
locality,
city_not_seattle_text,
)
area_text = "the {} area{}".format(
locality, city_not_seattle_text
)
elif (
"district" in geocode["features"][0]["properties"]["geocoding"]
):
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
street,
geocode["features"][0]["properties"]["geocoding"][
"district"
],
city_not_seattle_text,
)
area_text = "the {} area{}".format(
geocode["features"][0]["properties"]["geocoding"][
"district"
],
city_not_seattle_text,
)
else:
alt_text = "A map showing the location of the outage, centered around {} in {}.".format(
street,
geocode["features"][0]["properties"]["geocoding"]["city"],
)
area_text = geocode["features"][0]["properties"]["geocoding"][
"city"
]
except Exception:
alt_text = "A map showing the location of the outage."
with io.BytesIO() as map_image_file:
map_image.save(map_image_file, format="PNG", optimize=True)
map_media_post = mastodon.media_post(
map_image_file.getvalue(),
mime_type="image/png",
description=alt_text,
)
except Exception as e:
print(e)
print(
"Ran into an issue with generating/uploading the map. Will post without it."
)
map_media_post = None
# Fallback location from the SCL API in case one couldn't be reverse geocoded
if not area_text:
area_text = event["city"]
post_text = """Seattle City Light is reporting a {} outage in {}.
Start Date: {}
Est. Restoration: {}
Cause: {}
{}""".format(
outage_size.lower(),
area_text,
start_time.strftime(post_datetime_format),
estimated_restoration_time.strftime(post_datetime_format),
event["cause"],
hashtag_string,
)
mastodon_post_result = mastodon.status_post(
status=post_text,
media_ids=map_media_post,
visibility="public",
language="en",
)
new_outage_record = SclOutage(
scl_outage_id=event["id"],
outage_user_id=event["identifier"],
most_recent_post_id=mastodon_post_result["id"],
last_updated_time=last_updated_time,
estimated_restoration_time=estimated_restoration_time,
cause=event["cause"],
status=status,
outage_size=outage_size,
)
session.add(new_outage_record)
session.commit()
lookup_active_outages_statement = select(SclOutage).where(
SclOutage.no_longer_in_response_time == None
)
for active_outage in session.scalars(lookup_active_outages_statement):
if not any(event["id"] == active_outage.scl_outage_id for event in scl_events):
# Event ID no longer exists in response
mastodon_post_result = mastodon.status_post(
status="This outage is reported to be resolved.\n\n#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format(
active_outage.outage_user_id
),
in_reply_to_id=active_outage.most_recent_post_id,
visibility="public",
language="en",
)
active_outage.most_recent_post_id = mastodon_post_result["id"]
active_outage.no_longer_in_response_time = datetime.now()
session.commit()