seattlecitylight-mastodon-bot/scl.py

534 lines
20 KiB
Python

import io
from datetime import datetime
from typing import Optional
import mastodon
import requests
import shapely
import staticmap
import yaml
from mastodon import Mastodon
from PIL import Image, ImageDraw, ImageFont
from shapely import Geometry
from sqlalchemy import create_engine, select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
from geospatial import DBGeometry, convert_outage_geometry
post_datetime_format = "%b %e %l:%M %p"
scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events"
scl_events_response = requests.get(scl_events_url)
try:
scl_events = scl_events_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from SCL API")
raise
config = yaml.safe_load(open("config.yml"))
stadiamaps_api_key = config["stadiamaps"]["api_key"]
nominatim_url = config["nominatim"]["api_base_url"]
mastodon_client = Mastodon(
client_id=config["mastodon"]["client_id"],
client_secret=config["mastodon"]["client_secret"],
access_token=config["mastodon"]["access_token"],
api_base_url=config["mastodon"]["api_base_url"],
)
class AttribStaticMap(staticmap.StaticMap, object):
def __init__(self, *args, **kwargs):
self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap"
super(AttribStaticMap, self).__init__(*args, **kwargs)
def _draw_features(self, image):
super(AttribStaticMap, self)._draw_features(image)
txt = Image.new("RGBA", image.size, (255, 255, 255, 0))
fnt = ImageFont.truetype("fonts/PublicSans-Regular.otf", 24)
d = ImageDraw.Draw(txt)
textSize = fnt.getbbox(self.attribution)
textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3])
offset = 2
options = {"fill": (255, 255, 255, 180)}
d.rectangle(
[
(textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)),
(
textSize[2] + textPosition[0] + (2 * offset),
textSize[3] + textPosition[1] + (2 * offset),
),
],
**options,
)
# draw text, full opacity
d.text(
(textPosition[0] - offset, textPosition[1] - offset),
self.attribution,
font=fnt,
fill="black",
)
image.paste(txt, (0, 0), txt)
def classify_event_size(num_people: int) -> dict[str, str | bool]:
if num_people < 250:
return {
"size": "Small",
"outage_color": "#F97316",
"is_postable": False,
}
elif num_people < 1000:
return {
"size": "Medium",
"outage_color": "#EF4444",
"is_postable": True,
}
else:
return {
"size": "Large",
"outage_color": "#991B1B",
"is_postable": True,
}
def get_hashtag_string(event) -> str:
city = str()
try:
city = event["geoloc_city"]
except KeyError:
city = event["city"]
neighborhood_text = str()
try:
neighborhood = event["neighborhood"]
if neighborhood != city:
neighborhood_text = " #{}".format(neighborhood.title().replace(" ", ""))
except KeyError:
pass
hashtag_string = "#SeattleCityLightOutage #SCLOutage{} #{}".format(
neighborhood_text, city.title().replace(" ", "")
)
return hashtag_string
def get_alt_text_string(event) -> str:
try:
street = event["geoloc_street"]
city = event["geoloc_city"]
if city != "Seattle":
city_not_seattle_text = " of {}".format(city)
else:
city_not_seattle_text = ""
try:
locality = event["neighborhood"]
except KeyError:
return "A map showing the location of the outage, centered around {} in {}.".format(
street,
city,
)
if locality == "Uptown":
locality = "Lower Queen Anne"
return "A map showing the location of the outage, centered around {} in the {} area{}.".format(
street,
locality,
city_not_seattle_text,
)
except Exception:
return "A map showing the location of the outage."
def do_initial_post(
event,
event_class,
start_time: datetime,
estimated_restoration_time: datetime,
outage_geometries: shapely.MultiPolygon,
) -> dict[str, str | None]:
post_id = None
map_media_post_id = None
# Fallback location from the SCL API in case one couldn't be reverse geocoded
area_text = event["city"]
try:
map = AttribStaticMap(
1024,
1024,
url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}@2x.png?api_key="
+ stadiamaps_api_key,
tile_size=512,
)
assert event["polygons"]["type"] == "polygon"
for ring in event["polygons"]["rings"]:
polygon = staticmap.Polygon(
ring,
# Appending 7F to the fill_color makes it 50% transparent
fill_color="{}7F".format(event_class["outage_color"]),
outline_color=event_class["outage_color"],
simplify=True,
)
map.add_polygon(polygon)
try:
outage_center: shapely.Point = outage_geometries.centroid
assert outage_center.geom_type == "Point"
# Check to make sure the calculated lat and lon are sane enough
# NW Corner
assert outage_center.y < 48 and outage_center.x > -122.6
# SE Corner
assert outage_center.y > 47.2 and outage_center.x < -122
# Zoom level 17 ensures that we won't get any building/POI names, just street names
geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format(
nominatim_url=nominatim_url,
lat=outage_center.y,
lon=outage_center.x,
)
geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"}
geocode_response = requests.get(geocode_url, headers=geocode_headers)
try:
geocode = geocode_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from nominatim API")
raise
city = geocode["features"][0]["properties"]["geocoding"]["city"]
street = geocode["features"][0]["properties"]["geocoding"]["name"]
event["geoloc_city"] = city
event["geoloc_street"] = street
if city != "Seattle":
city_not_seattle_text = " of {}".format(city)
else:
city_not_seattle_text = ""
if (
"locality" in geocode["features"][0]["properties"]["geocoding"]
and event_class["size"] != "Large"
):
locality = geocode["features"][0]["properties"]["geocoding"]["locality"]
if locality == "Uptown":
locality = "Lower Queen Anne"
area_text = "the {} area{}".format(locality, city_not_seattle_text)
event["neighborhood"] = locality
elif "district" in geocode["features"][0]["properties"]["geocoding"]:
district = geocode["features"][0]["properties"]["geocoding"]["district"]
area_text = "the {} area{}".format(
district,
city_not_seattle_text,
)
event["neighborhood"] = district
else:
area_text = city
except Exception:
pass
map_image = map.render()
with io.BytesIO() as map_image_file:
map_image.save(map_image_file, format="WebP", method=6)
map_media_post = mastodon_client.media_post(
map_image_file.getvalue(),
mime_type="image/webp",
description=get_alt_text_string(event),
)
map_media_post_id = map_media_post["id"]
except Exception as e:
print(e)
print(
"Ran into an issue with generating/uploading the map. Will post without it."
)
map_media_post = None
hashtag_string = get_hashtag_string(event)
est_restoration_post_text = str()
if estimated_restoration_time > datetime.now():
est_restoration_post_text = "\nEst. Restoration: {}\n".format(
estimated_restoration_time.strftime(post_datetime_format)
)
post_text = """Seattle City Light is reporting a {} outage in {}.
Start Date: {}{}
Cause: {}
{}""".format(
event_class["size"].lower(),
area_text,
start_time.strftime(post_datetime_format),
est_restoration_post_text,
event["cause"],
hashtag_string,
)
print(
"Posting the following to Mastodon, with a post length of {}:\n{}".format(
len(post_text), post_text
)
)
post_result = mastodon_client.status_post(
status=post_text,
media_ids=map_media_post,
visibility="public",
language="en",
)
post_id = post_result["id"]
return {"post_id": post_id, "map_media_post_id": map_media_post_id}
class Base(DeclarativeBase):
type_annotation_map = {
Geometry: DBGeometry,
}
class SclOutage(Base):
__tablename__ = "scl_outages"
scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True)
outage_user_id: Mapped[str] = mapped_column()
most_recent_post_id: Mapped[Optional[str]] = mapped_column()
initial_post_id: Mapped[Optional[str]] = mapped_column()
map_media_post_id: Mapped[Optional[str]] = mapped_column()
last_updated_time: Mapped[datetime] = mapped_column()
estimated_restoration_time: Mapped[datetime] = mapped_column()
cause: Mapped[str] = mapped_column()
status: Mapped[Optional[str]] = mapped_column()
no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column()
start_time: Mapped[datetime] = mapped_column()
num_people: Mapped[int] = mapped_column()
max_num_people: Mapped[int] = mapped_column()
neighborhood: Mapped[Optional[str]] = mapped_column()
city: Mapped[Optional[str]] = mapped_column()
outage_geometries: Mapped[Optional[Geometry]] = mapped_column()
geometries_modified: Mapped[Optional[bool]] = mapped_column()
def __repr__(self) -> str:
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}, start_time={self.start_time!r}, num_people={self.num_people!r}, max_num_people={self.max_num_people!r}, neighborhood={self.neighborhood!r}, city={self.city!r}, outage_geometries={self.outage_geometries!r}, geometries_modified={self.geometries_modified!r})"
engine = create_engine("sqlite:///scl.db")
Base.metadata.create_all(engine)
with Session(engine) as session:
for event in scl_events:
print("Processing outage with Internal ID {}".format(event["id"]))
start_time = datetime.fromtimestamp(event["startTime"] / 1000)
last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000)
estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000)
lookup_statement = select(SclOutage).where(
SclOutage.scl_outage_id == event["id"]
)
lookup_result = session.scalars(lookup_statement)
event_class = classify_event_size(event["numPeople"])
if "status" in event:
status = event["status"]
else:
status = None
outage_geometries = convert_outage_geometry(event)
try:
hashtag_string = get_hashtag_string(event)
existing_record = lookup_result.one()
updated_properties = []
updated_entries = []
est_restoration_diff_mins = (
abs(
(
estimated_restoration_time
- existing_record.estimated_restoration_time
).total_seconds()
)
/ 60
)
# Only post if estimated restoration time has changed by 60m or more
if est_restoration_diff_mins >= 60:
existing_record.estimated_restoration_time = estimated_restoration_time
if estimated_restoration_time > datetime.now():
# New estimated restoration time is in the future, so likely to be a real time
updated_properties.append("estimated restoration")
updated_entries.append(
"Est. Restoration: {}".format(
estimated_restoration_time.strftime(post_datetime_format)
)
)
if event["cause"] != existing_record.cause:
existing_record.cause = event["cause"]
updated_properties.append("cause")
updated_entries.append("Cause: {}".format(event["cause"]))
previous_event_class = classify_event_size(existing_record.num_people)
if event_class["size"] != previous_event_class["size"]:
updated_properties.append("outage size")
updated_entries.append("Outage Size: {}".format(event_class["size"]))
if status != existing_record.status:
existing_record.status = status
updated_properties.append("status")
updated_entries.append("Status: {}".format(status))
if existing_record.num_people != event["numPeople"]:
existing_record.num_people = event["numPeople"]
if existing_record.max_num_people < event["numPeople"]:
# Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about
existing_record.max_num_people = event["numPeople"]
max_event_class = classify_event_size(existing_record.max_num_people)
if existing_record.outage_geometries != outage_geometries:
print("Geometries modified")
existing_record.outage_geometries = outage_geometries
existing_record.geometries_modified = True
if updated_properties:
updated_properties.sort()
updated_entries.sort()
if len(updated_properties) == 1:
updated_entries.insert(
0,
"The {} of this outage has been updated.\n".format(
updated_properties[0]
),
)
else:
# TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence
updated_entries.insert(
0,
"The {} of this outage have been updated.\n".format(
", ".join(updated_properties)
),
)
if max_event_class["is_postable"] and existing_record.initial_post_id:
try:
post_result = mastodon_client.status_post(
status="\n".join(updated_entries),
in_reply_to_id=existing_record.most_recent_post_id,
visibility="unlisted",
language="en",
)
existing_record.most_recent_post_id = post_result["id"]
except mastodon.MastodonNotFoundError:
print(
"Could not post a reply to the existing post, skip this update"
)
elif max_event_class["is_postable"]:
print(
"Posting an event that grew above the threshold required to post"
)
initial_post_result = do_initial_post(
event,
event_class,
start_time,
estimated_restoration_time,
outage_geometries,
)
try:
existing_record.neighborhood = initial_post_result[
"neighborhood"
]
except KeyError:
pass
try:
existing_record.city = initial_post_result["city"]
except KeyError:
pass
existing_record.initial_post_id = initial_post_result["post_id"]
existing_record.most_recent_post_id = initial_post_result["post_id"]
existing_record.map_media_post_id = initial_post_result[
"map_media_post_id"
]
else:
print("Existing record was found, and no properties were updated.")
session.commit()
except NoResultFound:
print("Existing record not found")
post_id = None
map_media_post_id = None
neighborhood = None
city = None
if not event_class["is_postable"]:
print(
"Outage is {} considered postable, will not post".format(
event_class["size"]
)
)
else:
initial_post_result = do_initial_post(
event,
event_class,
start_time,
estimated_restoration_time,
outage_geometries,
)
post_id = initial_post_result["post_id"]
map_media_post_id = initial_post_result["map_media_post_id"]
try:
neighborhood = initial_post_result["neighborhood"]
except KeyError:
pass
try:
city = initial_post_result["city"]
except KeyError:
pass
new_outage_record = SclOutage(
scl_outage_id=event["id"],
outage_user_id=event["identifier"],
most_recent_post_id=post_id,
initial_post_id=post_id,
map_media_post_id=map_media_post_id,
last_updated_time=last_updated_time,
estimated_restoration_time=estimated_restoration_time,
cause=event["cause"],
status=status,
start_time=start_time,
num_people=event["numPeople"],
max_num_people=event["numPeople"],
neighborhood=neighborhood,
city=city,
outage_geometries=outage_geometries,
)
session.add(new_outage_record)
session.commit()
lookup_active_outages_statement = select(SclOutage).where(
SclOutage.no_longer_in_response_time == None # noqa: E711 - Syntax must stay this way for SQLAlchemy
)
for active_outage in session.scalars(lookup_active_outages_statement):
if (
not any(event["id"] == active_outage.scl_outage_id for event in scl_events)
and scl_events
):
# Event ID no longer exists in response
if active_outage.most_recent_post_id:
try:
post_result = mastodon_client.status_post(
status="This outage is no longer in the SCL feed, which usually means it's either been resolved, or split into multiple smaller outages.\n\n#SeattleCityLightOutage #SCLOutage",
in_reply_to_id=active_outage.most_recent_post_id,
visibility="public",
language="en",
)
active_outage.most_recent_post_id = post_result["id"]
except mastodon.MastodonNotFoundError:
print(
"The outage post couldn't be replied to, it was externally deleted."
)
active_outage.no_longer_in_response_time = datetime.now()
session.commit()