seattlecitylight-mastodon-bot/scl.py

435 lines
17 KiB
Python
Raw Normal View History

2024-01-13 17:13:38 -08:00
import io
2024-01-14 12:27:02 -08:00
import math
2024-01-13 14:24:00 -08:00
from datetime import datetime
2024-01-13 15:01:11 -08:00
from typing import Optional
2024-01-31 21:34:58 -08:00
import mastodon
2024-01-13 14:24:00 -08:00
import requests
2024-01-14 12:27:02 -08:00
import yaml
2024-01-13 14:24:00 -08:00
from mastodon import Mastodon
2024-01-13 17:13:38 -08:00
from PIL import Image, ImageDraw, ImageFont
2024-01-13 15:01:11 -08:00
from sqlalchemy import create_engine, select
2024-01-13 14:24:00 -08:00
from sqlalchemy.exc import NoResultFound
2024-01-13 15:01:11 -08:00
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
2024-01-13 17:13:38 -08:00
from staticmap import Polygon, StaticMap
2024-01-13 14:24:00 -08:00
post_datetime_format = "%b %e %l:%M %p"
scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events"
scl_events_response = requests.get(scl_events_url)
try:
scl_events = scl_events_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from SCL API")
raise
2024-01-14 12:27:02 -08:00
config = yaml.safe_load(open("config.yml"))
stadiamaps_api_key = config["stadiamaps"]["api_key"]
nominatim_url = config["nominatim"]["api_base_url"]
2024-01-31 21:34:58 -08:00
mastodon_client = Mastodon(
2024-01-14 12:27:02 -08:00
client_id=config["mastodon"]["client_id"],
client_secret=config["mastodon"]["client_secret"],
access_token=config["mastodon"]["access_token"],
api_base_url=config["mastodon"]["api_base_url"],
)
2024-01-13 17:13:38 -08:00
class AttribStaticMap(StaticMap, object):
def __init__(self, *args, **kwargs):
self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap"
super(AttribStaticMap, self).__init__(*args, **kwargs)
def _draw_features(self, image):
super(AttribStaticMap, self)._draw_features(image)
txt = Image.new("RGBA", image.size, (255, 255, 255, 0))
# get a font
# fnt = ImageFont.truetype('FreeMono.ttf', 12)
fnt = ImageFont.load_default()
# get a drawing context
d = ImageDraw.Draw(txt)
textSize = fnt.getbbox(self.attribution)
textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3])
offset = 2
options = {"fill": (255, 255, 255, 180)}
d.rectangle(
[
(textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)),
(
textSize[2] + textPosition[0] + (2 * offset),
textSize[3] + textPosition[1] + (2 * offset),
),
],
**options,
)
# draw text, full opacity
d.text(
(textPosition[0] - offset, textPosition[1] - offset),
self.attribution,
font=fnt,
fill="black",
)
image.paste(txt, (0, 0), txt)
2024-01-13 14:24:00 -08:00
2024-02-15 07:53:17 -08:00
def classify_event_size(num_people: int) -> dict[str, str | bool]:
2024-01-30 07:58:11 -08:00
if num_people < 250:
return {
"size": "Small",
"outage_color": "#F97316",
"is_postable": False,
}
2024-02-13 20:42:02 -08:00
elif num_people < 1000:
2024-01-30 07:58:11 -08:00
return {
"size": "Medium",
"outage_color": "#EF4444",
"is_postable": True,
}
else:
return {
"size": "Large",
"outage_color": "#991B1B",
"is_postable": True,
}
2024-02-15 07:53:17 -08:00
def get_hashtag_string(event) -> str:
hashtag_string = "#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format(
event["identifier"]
)
return hashtag_string
def do_initial_post(
event, event_class, start_time: datetime, estimated_restoration_time: datetime
) -> dict[str, str | None]:
post_id = None
map_media_post_id = None
# Fallback location from the SCL API in case one couldn't be reverse geocoded
area_text = event["city"]
try:
map = AttribStaticMap(
512,
512,
url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}.png?api_key="
+ stadiamaps_api_key,
)
assert event["polygons"]["type"] == "polygon"
for ring in event["polygons"]["rings"]:
polygon = Polygon(
ring,
# Appending 7F to the fill_color makes it 50% transparent
fill_color="{}7F".format(event_class["outage_color"]),
outline_color=event_class["outage_color"],
simplify=True,
)
map.add_polygon(polygon)
map_image = map.render()
try:
def num2deg(xtile, ytile, zoom):
n = 1 << zoom
lon_deg = xtile / n * 360.0 - 180.0
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
lat_deg = math.degrees(lat_rad)
return lat_deg, lon_deg
center_lat_lon = num2deg(map.x_center, map.y_center, map.zoom)
# Check to make sure the calculated lat and lon are sane enough
# NW Corner
assert center_lat_lon[0] < 48 and center_lat_lon[1] > -122.6
# SE Corner
assert center_lat_lon[0] > 47.2 and center_lat_lon[1] < -122
# Zoom level 17 ensures that we won't get any building/POI names, just street names
geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format(
nominatim_url=nominatim_url,
lat=center_lat_lon[0],
lon=center_lat_lon[1],
)
geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"}
geocode_response = requests.get(geocode_url, headers=geocode_headers)
try:
geocode = geocode_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from nominatim API")
raise
if geocode["features"][0]["properties"]["geocoding"]["city"] != "Seattle":
city_not_seattle_text = " of {}".format(
geocode["features"][0]["properties"]["geocoding"]["city"]
)
else:
city_not_seattle_text = ""
street = geocode["features"][0]["properties"]["geocoding"]["name"]
if (
"locality" in geocode["features"][0]["properties"]["geocoding"]
and event_class["size"] != "Large"
):
locality = geocode["features"][0]["properties"]["geocoding"]["locality"]
if locality == "Uptown":
locality = "Lower Queen Anne"
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
street,
locality,
city_not_seattle_text,
)
area_text = "the {} area{}".format(locality, city_not_seattle_text)
elif "district" in geocode["features"][0]["properties"]["geocoding"]:
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
street,
geocode["features"][0]["properties"]["geocoding"]["district"],
city_not_seattle_text,
)
area_text = "the {} area{}".format(
geocode["features"][0]["properties"]["geocoding"]["district"],
city_not_seattle_text,
)
else:
alt_text = "A map showing the location of the outage, centered around {} in {}.".format(
street,
geocode["features"][0]["properties"]["geocoding"]["city"],
)
area_text = geocode["features"][0]["properties"]["geocoding"]["city"]
except Exception:
alt_text = "A map showing the location of the outage."
with io.BytesIO() as map_image_file:
map_image.save(map_image_file, format="PNG", optimize=True)
map_media_post = mastodon_client.media_post(
map_image_file.getvalue(),
mime_type="image/png",
description=alt_text,
)
map_media_post_id = map_media_post["id"]
except Exception as e:
print(e)
print(
"Ran into an issue with generating/uploading the map. Will post without it."
)
map_media_post = None
hashtag_string = get_hashtag_string(event)
post_text = """Seattle City Light is reporting a {} outage in {}.
Start Date: {}
Est. Restoration: {}
Cause: {}
{}""".format(
event_class["size"].lower(),
area_text,
start_time.strftime(post_datetime_format),
estimated_restoration_time.strftime(post_datetime_format),
event["cause"],
hashtag_string,
)
print(
"Posting the following to Mastodon, with a post length of {}:\n{}".format(
len(post_text), post_text
)
)
post_result = mastodon_client.status_post(
status=post_text,
media_ids=map_media_post,
visibility="public",
language="en",
)
post_id = post_result["id"]
return {"post_id": post_id, "map_media_post_id": map_media_post_id}
2024-01-13 14:24:00 -08:00
class Base(DeclarativeBase):
pass
class SclOutage(Base):
__tablename__ = "scl_outages"
scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True)
outage_user_id: Mapped[str] = mapped_column()
2024-01-13 15:00:05 -08:00
most_recent_post_id: Mapped[Optional[str]] = mapped_column()
2024-01-30 07:19:13 -08:00
initial_post_id: Mapped[Optional[str]] = mapped_column()
map_media_post_id: Mapped[Optional[str]] = mapped_column()
2024-01-13 14:24:00 -08:00
last_updated_time: Mapped[datetime] = mapped_column()
estimated_restoration_time: Mapped[datetime] = mapped_column()
cause: Mapped[str] = mapped_column()
status: Mapped[Optional[str]] = mapped_column()
2024-01-13 15:00:05 -08:00
no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column()
2024-01-29 22:42:12 -05:00
start_time: Mapped[datetime] = mapped_column()
2024-01-29 22:50:37 -05:00
num_people: Mapped[int] = mapped_column()
2024-01-30 07:58:11 -08:00
max_num_people: Mapped[int] = mapped_column()
2024-01-13 14:24:00 -08:00
def __repr__(self) -> str:
2024-01-30 07:58:11 -08:00
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}), start_time={self.start_time!r}), num_people={self.num_people!r}), max_num_people={self.max_num_people!r})"
2024-01-13 14:24:00 -08:00
2024-01-31 21:36:32 -08:00
engine = create_engine("sqlite:///scl.db")
2024-01-13 14:24:00 -08:00
Base.metadata.create_all(engine)
with Session(engine) as session:
for event in scl_events:
print("Processing outage with Internal ID {}".format(event["id"]))
start_time = datetime.fromtimestamp(event["startTime"] / 1000)
last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000)
estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000)
lookup_statement = select(SclOutage).where(
SclOutage.scl_outage_id == event["id"]
)
lookup_result = session.scalars(lookup_statement)
2024-01-30 07:58:11 -08:00
event_class = classify_event_size(event["numPeople"])
2024-01-13 14:24:00 -08:00
if "status" in event:
status = event["status"]
else:
status = None
try:
2024-02-15 07:53:17 -08:00
hashtag_string = get_hashtag_string(event)
2024-01-13 14:24:00 -08:00
existing_record = lookup_result.one()
updated_properties = []
updated_entries = []
if estimated_restoration_time != existing_record.estimated_restoration_time:
existing_record.estimated_restoration_time = estimated_restoration_time
updated_properties.append("estimated restoration")
updated_entries.append(
"Est. Restoration: {}".format(
estimated_restoration_time.strftime(post_datetime_format)
)
)
if event["cause"] != existing_record.cause:
existing_record.cause = event["cause"]
updated_properties.append("cause")
updated_entries.append("Cause: {}".format(event["cause"]))
2024-01-30 07:58:11 -08:00
previous_event_class = classify_event_size(existing_record.num_people)
if event_class["size"] != previous_event_class["size"]:
2024-01-13 14:24:00 -08:00
updated_properties.append("outage size")
2024-01-30 07:58:11 -08:00
updated_entries.append("Outage Size: {}".format(event_class["size"]))
2024-01-13 14:24:00 -08:00
if status != existing_record.status:
existing_record.status = status
updated_properties.append("status")
updated_entries.append("Status: {}".format(status))
2024-01-29 22:50:37 -05:00
if existing_record.num_people != event["numPeople"]:
existing_record.num_people = event["numPeople"]
2024-01-30 07:58:11 -08:00
if existing_record.max_num_people < event["numPeople"]:
# Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about
existing_record.max_num_people = event["numPeople"]
max_event_class = classify_event_size(existing_record.max_num_people)
2024-01-13 14:24:00 -08:00
if updated_properties:
updated_properties.sort()
updated_entries.sort()
if len(updated_properties) == 1:
updated_entries.insert(
0,
"The {} of this outage has been updated.\n".format(
updated_properties[0]
),
)
else:
# TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence
updated_entries.insert(
0,
"The {} of this outage have been updated.\n".format(
", ".join(updated_properties)
),
)
updated_entries.append("")
updated_entries.append(hashtag_string)
if max_event_class["is_postable"] and existing_record.initial_post_id:
2024-01-31 21:34:58 -08:00
post_result = mastodon_client.status_post(
status="\n".join(updated_entries),
in_reply_to_id=existing_record.most_recent_post_id,
visibility="public",
language="en",
)
existing_record.most_recent_post_id = post_result["id"]
elif max_event_class["is_postable"]:
print(
2024-02-15 07:53:17 -08:00
"Posting an event that grew above the threshold required to post"
)
2024-02-15 07:53:17 -08:00
initial_post_result = do_initial_post(
event, event_class, start_time, estimated_restoration_time
)
existing_record.initial_post_id = initial_post_result["post_id"]
existing_record.most_recent_post_id = initial_post_result["post_id"]
existing_record.map_media_post_id = initial_post_result[
"map_media_post_id"
]
session.commit()
2024-01-13 14:24:00 -08:00
except NoResultFound:
2024-01-29 22:42:12 -05:00
print("Existing record not found")
2024-01-30 16:08:54 -08:00
post_id = None
map_media_post_id = None
2024-01-30 07:58:11 -08:00
if not event_class["is_postable"]:
2024-01-30 16:00:00 -08:00
print(
"Outage is {} considered postable, will not post".format(
event_class["size"]
)
)
2024-01-29 22:42:12 -05:00
else:
2024-02-15 07:53:17 -08:00
initial_post_result = do_initial_post(
event, event_class, start_time, estimated_restoration_time
2024-01-29 22:42:12 -05:00
)
2024-02-15 07:53:17 -08:00
post_id = initial_post_result["post_id"]
map_media_post_id = initial_post_result["map_media_post_id"]
2024-01-13 14:24:00 -08:00
new_outage_record = SclOutage(
scl_outage_id=event["id"],
outage_user_id=event["identifier"],
2024-01-30 16:08:54 -08:00
most_recent_post_id=post_id,
initial_post_id=post_id,
map_media_post_id=map_media_post_id,
2024-01-13 14:24:00 -08:00
last_updated_time=last_updated_time,
estimated_restoration_time=estimated_restoration_time,
cause=event["cause"],
status=status,
2024-01-29 22:42:12 -05:00
start_time=start_time,
2024-01-30 07:19:13 -08:00
num_people=event["numPeople"],
2024-01-30 07:58:11 -08:00
max_num_people=event["numPeople"],
2024-01-13 14:24:00 -08:00
)
session.add(new_outage_record)
session.commit()
lookup_active_outages_statement = select(SclOutage).where(
2024-02-10 20:51:41 -08:00
SclOutage.no_longer_in_response_time == None # noqa: E711 - Syntax must stay this way for SQLAlchemy
2024-01-13 14:24:00 -08:00
)
for active_outage in session.scalars(lookup_active_outages_statement):
2024-01-31 21:26:41 -08:00
if (
not any(event["id"] == active_outage.scl_outage_id for event in scl_events)
and scl_events
2024-01-31 21:26:41 -08:00
):
2024-01-13 14:24:00 -08:00
# Event ID no longer exists in response
if active_outage.most_recent_post_id:
2024-01-31 21:34:58 -08:00
try:
post_result = mastodon_client.status_post(
status="This outage is reported to be resolved.\n\n#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format(
active_outage.outage_user_id
),
in_reply_to_id=active_outage.most_recent_post_id,
visibility="public",
language="en",
)
active_outage.most_recent_post_id = post_result["id"]
except mastodon.MastodonNotFoundError:
print(
"The outage post couldn't be replied to, it was externally deleted."
)
2024-01-13 14:24:00 -08:00
active_outage.no_longer_in_response_time = datetime.now()
session.commit()