495 lines
19 KiB
Python
495 lines
19 KiB
Python
import io
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import mastodon
|
|
import requests
|
|
import shapely
|
|
import yaml
|
|
from mastodon import Mastodon
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from sqlalchemy import create_engine, select
|
|
from sqlalchemy.exc import NoResultFound
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
|
|
from staticmap import Polygon, StaticMap
|
|
|
|
from geospatial import convert_outage_geometry
|
|
|
|
post_datetime_format = "%b %e %l:%M %p"
|
|
|
|
scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events"
|
|
scl_events_response = requests.get(scl_events_url)
|
|
try:
|
|
scl_events = scl_events_response.json()
|
|
except requests.JSONDecodeError:
|
|
print("JSON could not be loaded from SCL API")
|
|
raise
|
|
|
|
config = yaml.safe_load(open("config.yml"))
|
|
stadiamaps_api_key = config["stadiamaps"]["api_key"]
|
|
nominatim_url = config["nominatim"]["api_base_url"]
|
|
mastodon_client = Mastodon(
|
|
client_id=config["mastodon"]["client_id"],
|
|
client_secret=config["mastodon"]["client_secret"],
|
|
access_token=config["mastodon"]["access_token"],
|
|
api_base_url=config["mastodon"]["api_base_url"],
|
|
)
|
|
|
|
|
|
class AttribStaticMap(StaticMap, object):
|
|
def __init__(self, *args, **kwargs):
|
|
self.attribution = "© Stadia Maps © OpenMapTiles © OpenStreetMap"
|
|
super(AttribStaticMap, self).__init__(*args, **kwargs)
|
|
|
|
def _draw_features(self, image):
|
|
super(AttribStaticMap, self)._draw_features(image)
|
|
|
|
txt = Image.new("RGBA", image.size, (255, 255, 255, 0))
|
|
fnt = ImageFont.truetype("fonts/PublicSans-Regular.otf", 24)
|
|
d = ImageDraw.Draw(txt)
|
|
|
|
textSize = fnt.getbbox(self.attribution)
|
|
textPosition = (image.size[0] - textSize[2], image.size[1] - textSize[3])
|
|
offset = 2
|
|
options = {"fill": (255, 255, 255, 180)}
|
|
d.rectangle(
|
|
[
|
|
(textPosition[0] - (2 * offset), textPosition[1] - (2 * offset)),
|
|
(
|
|
textSize[2] + textPosition[0] + (2 * offset),
|
|
textSize[3] + textPosition[1] + (2 * offset),
|
|
),
|
|
],
|
|
**options,
|
|
)
|
|
|
|
# draw text, full opacity
|
|
d.text(
|
|
(textPosition[0] - offset, textPosition[1] - offset),
|
|
self.attribution,
|
|
font=fnt,
|
|
fill="black",
|
|
)
|
|
|
|
image.paste(txt, (0, 0), txt)
|
|
|
|
|
|
def classify_event_size(num_people: int) -> dict[str, str | bool]:
|
|
if num_people < 250:
|
|
return {
|
|
"size": "Small",
|
|
"outage_color": "#F97316",
|
|
"is_postable": False,
|
|
}
|
|
elif num_people < 1000:
|
|
return {
|
|
"size": "Medium",
|
|
"outage_color": "#EF4444",
|
|
"is_postable": True,
|
|
}
|
|
else:
|
|
return {
|
|
"size": "Large",
|
|
"outage_color": "#991B1B",
|
|
"is_postable": True,
|
|
}
|
|
|
|
|
|
def get_hashtag_string(event) -> str:
|
|
city = str()
|
|
try:
|
|
city = event["geoloc_city"]
|
|
except KeyError:
|
|
city = event["city"]
|
|
|
|
neighborhood_text = str()
|
|
try:
|
|
neighborhood = event["neighborhood"]
|
|
if neighborhood != city:
|
|
neighborhood_text = " #{}".format(neighborhood.title().replace(" ", ""))
|
|
except KeyError:
|
|
pass
|
|
|
|
hashtag_string = "#SeattleCityLightOutage #SCLOutage{} #{}".format(
|
|
neighborhood_text, city.title().replace(" ", "")
|
|
)
|
|
return hashtag_string
|
|
|
|
|
|
def do_initial_post(
|
|
event,
|
|
event_class,
|
|
start_time: datetime,
|
|
estimated_restoration_time: datetime,
|
|
outage_geometries: shapely.MultiPolygon,
|
|
) -> dict[str, str | None]:
|
|
post_id = None
|
|
map_media_post_id = None
|
|
# Fallback location from the SCL API in case one couldn't be reverse geocoded
|
|
area_text = event["city"]
|
|
try:
|
|
map = AttribStaticMap(
|
|
1024,
|
|
1024,
|
|
url_template="https://tiles.stadiamaps.com/tiles/outdoors/{z}/{x}/{y}@2x.png?api_key="
|
|
+ stadiamaps_api_key,
|
|
tile_size=512,
|
|
)
|
|
assert event["polygons"]["type"] == "polygon"
|
|
for ring in event["polygons"]["rings"]:
|
|
polygon = Polygon(
|
|
ring,
|
|
# Appending 7F to the fill_color makes it 50% transparent
|
|
fill_color="{}7F".format(event_class["outage_color"]),
|
|
outline_color=event_class["outage_color"],
|
|
simplify=True,
|
|
)
|
|
map.add_polygon(polygon)
|
|
|
|
try:
|
|
outage_center: shapely.Point = outage_geometries.centroid
|
|
|
|
assert outage_center.geom_type == "Point"
|
|
# Check to make sure the calculated lat and lon are sane enough
|
|
# NW Corner
|
|
assert outage_center.y < 48 and outage_center.x > -122.6
|
|
# SE Corner
|
|
assert outage_center.y > 47.2 and outage_center.x < -122
|
|
|
|
# Zoom level 17 ensures that we won't get any building/POI names, just street names
|
|
geocode_url = "{nominatim_url}/reverse?lat={lat}&lon={lon}&format=geocodejson&zoom=17".format(
|
|
nominatim_url=nominatim_url,
|
|
lat=outage_center.y,
|
|
lon=outage_center.x,
|
|
)
|
|
geocode_headers = {"User-Agent": "seattlecitylight-mastodon-bot"}
|
|
geocode_response = requests.get(geocode_url, headers=geocode_headers)
|
|
try:
|
|
geocode = geocode_response.json()
|
|
except requests.JSONDecodeError:
|
|
print("JSON could not be loaded from nominatim API")
|
|
raise
|
|
|
|
city = geocode["features"][0]["properties"]["geocoding"]["city"]
|
|
street = geocode["features"][0]["properties"]["geocoding"]["name"]
|
|
event["geoloc_city"] = city
|
|
|
|
if city != "Seattle":
|
|
city_not_seattle_text = " of {}".format(city)
|
|
else:
|
|
city_not_seattle_text = ""
|
|
|
|
if (
|
|
"locality" in geocode["features"][0]["properties"]["geocoding"]
|
|
and event_class["size"] != "Large"
|
|
):
|
|
locality = geocode["features"][0]["properties"]["geocoding"]["locality"]
|
|
if locality == "Uptown":
|
|
locality = "Lower Queen Anne"
|
|
|
|
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
|
|
street,
|
|
locality,
|
|
city_not_seattle_text,
|
|
)
|
|
area_text = "the {} area{}".format(locality, city_not_seattle_text)
|
|
event["neighborhood"] = locality
|
|
elif "district" in geocode["features"][0]["properties"]["geocoding"]:
|
|
district = geocode["features"][0]["properties"]["geocoding"]["district"]
|
|
alt_text = "A map showing the location of the outage, centered around {} in the {} area{}.".format(
|
|
street,
|
|
district,
|
|
city_not_seattle_text,
|
|
)
|
|
area_text = "the {} area{}".format(
|
|
district,
|
|
city_not_seattle_text,
|
|
)
|
|
event["neighborhood"] = district
|
|
else:
|
|
alt_text = "A map showing the location of the outage, centered around {} in {}.".format(
|
|
street,
|
|
city,
|
|
)
|
|
area_text = city
|
|
except Exception:
|
|
alt_text = "A map showing the location of the outage."
|
|
|
|
map_image = map.render()
|
|
|
|
with io.BytesIO() as map_image_file:
|
|
map_image.save(map_image_file, format="WebP", method=6)
|
|
map_media_post = mastodon_client.media_post(
|
|
map_image_file.getvalue(),
|
|
mime_type="image/png",
|
|
description=alt_text,
|
|
)
|
|
map_media_post_id = map_media_post["id"]
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
print(
|
|
"Ran into an issue with generating/uploading the map. Will post without it."
|
|
)
|
|
map_media_post = None
|
|
hashtag_string = get_hashtag_string(event)
|
|
|
|
est_restoration_post_text = str()
|
|
if estimated_restoration_time > datetime.now():
|
|
est_restoration_post_text = "\nEst. Restoration: {}\n".format(
|
|
estimated_restoration_time.strftime(post_datetime_format)
|
|
)
|
|
|
|
post_text = """Seattle City Light is reporting a {} outage in {}.
|
|
|
|
Start Date: {}{}
|
|
Cause: {}
|
|
|
|
{}""".format(
|
|
event_class["size"].lower(),
|
|
area_text,
|
|
start_time.strftime(post_datetime_format),
|
|
est_restoration_post_text,
|
|
event["cause"],
|
|
hashtag_string,
|
|
)
|
|
|
|
print(
|
|
"Posting the following to Mastodon, with a post length of {}:\n{}".format(
|
|
len(post_text), post_text
|
|
)
|
|
)
|
|
|
|
post_result = mastodon_client.status_post(
|
|
status=post_text,
|
|
media_ids=map_media_post,
|
|
visibility="public",
|
|
language="en",
|
|
)
|
|
post_id = post_result["id"]
|
|
|
|
return {"post_id": post_id, "map_media_post_id": map_media_post_id}
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
class SclOutage(Base):
|
|
__tablename__ = "scl_outages"
|
|
scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True)
|
|
outage_user_id: Mapped[str] = mapped_column()
|
|
most_recent_post_id: Mapped[Optional[str]] = mapped_column()
|
|
initial_post_id: Mapped[Optional[str]] = mapped_column()
|
|
map_media_post_id: Mapped[Optional[str]] = mapped_column()
|
|
last_updated_time: Mapped[datetime] = mapped_column()
|
|
estimated_restoration_time: Mapped[datetime] = mapped_column()
|
|
cause: Mapped[str] = mapped_column()
|
|
status: Mapped[Optional[str]] = mapped_column()
|
|
no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column()
|
|
start_time: Mapped[datetime] = mapped_column()
|
|
num_people: Mapped[int] = mapped_column()
|
|
max_num_people: Mapped[int] = mapped_column()
|
|
neighborhood: Mapped[Optional[str]] = mapped_column()
|
|
city: Mapped[Optional[str]] = mapped_column()
|
|
|
|
def __repr__(self) -> str:
|
|
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, initial_post_id={self.initial_post_id!r}, map_media_post_id={self.map_media_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r}, start_time={self.start_time!r}, num_people={self.num_people!r}, max_num_people={self.max_num_people!r}, neighborhood={self.neighborhood!r}, city={self.city!r})"
|
|
|
|
|
|
engine = create_engine("sqlite:///scl.db")
|
|
Base.metadata.create_all(engine)
|
|
|
|
with Session(engine) as session:
|
|
for event in scl_events:
|
|
print("Processing outage with Internal ID {}".format(event["id"]))
|
|
start_time = datetime.fromtimestamp(event["startTime"] / 1000)
|
|
last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000)
|
|
estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000)
|
|
|
|
lookup_statement = select(SclOutage).where(
|
|
SclOutage.scl_outage_id == event["id"]
|
|
)
|
|
lookup_result = session.scalars(lookup_statement)
|
|
|
|
event_class = classify_event_size(event["numPeople"])
|
|
|
|
if "status" in event:
|
|
status = event["status"]
|
|
else:
|
|
status = None
|
|
|
|
outage_geometries = convert_outage_geometry(event)
|
|
|
|
try:
|
|
hashtag_string = get_hashtag_string(event)
|
|
existing_record = lookup_result.one()
|
|
updated_properties = []
|
|
updated_entries = []
|
|
if estimated_restoration_time != existing_record.estimated_restoration_time:
|
|
existing_record.estimated_restoration_time = estimated_restoration_time
|
|
if estimated_restoration_time > datetime.now():
|
|
# New estimated restoration time is in the future, so likely to be a real time
|
|
updated_properties.append("estimated restoration")
|
|
updated_entries.append(
|
|
"Est. Restoration: {}".format(
|
|
estimated_restoration_time.strftime(post_datetime_format)
|
|
)
|
|
)
|
|
if event["cause"] != existing_record.cause:
|
|
existing_record.cause = event["cause"]
|
|
updated_properties.append("cause")
|
|
updated_entries.append("Cause: {}".format(event["cause"]))
|
|
|
|
previous_event_class = classify_event_size(existing_record.num_people)
|
|
if event_class["size"] != previous_event_class["size"]:
|
|
updated_properties.append("outage size")
|
|
updated_entries.append("Outage Size: {}".format(event_class["size"]))
|
|
if status != existing_record.status:
|
|
existing_record.status = status
|
|
updated_properties.append("status")
|
|
updated_entries.append("Status: {}".format(status))
|
|
if existing_record.num_people != event["numPeople"]:
|
|
existing_record.num_people = event["numPeople"]
|
|
if existing_record.max_num_people < event["numPeople"]:
|
|
# Used to determine the maximum number of people affected by this outage, to determine if it's worth posting about
|
|
existing_record.max_num_people = event["numPeople"]
|
|
max_event_class = classify_event_size(existing_record.max_num_people)
|
|
|
|
if updated_properties:
|
|
updated_properties.sort()
|
|
updated_entries.sort()
|
|
if len(updated_properties) == 1:
|
|
updated_entries.insert(
|
|
0,
|
|
"The {} of this outage has been updated.\n".format(
|
|
updated_properties[0]
|
|
),
|
|
)
|
|
else:
|
|
# TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence
|
|
updated_entries.insert(
|
|
0,
|
|
"The {} of this outage have been updated.\n".format(
|
|
", ".join(updated_properties)
|
|
),
|
|
)
|
|
if max_event_class["is_postable"] and existing_record.initial_post_id:
|
|
try:
|
|
post_result = mastodon_client.status_post(
|
|
status="\n".join(updated_entries),
|
|
in_reply_to_id=existing_record.most_recent_post_id,
|
|
visibility="unlisted",
|
|
language="en",
|
|
)
|
|
existing_record.most_recent_post_id = post_result["id"]
|
|
except mastodon.MastodonNotFoundError:
|
|
print(
|
|
"Could not post a reply to the existing post, skip this update"
|
|
)
|
|
elif max_event_class["is_postable"]:
|
|
print(
|
|
"Posting an event that grew above the threshold required to post"
|
|
)
|
|
initial_post_result = do_initial_post(
|
|
event,
|
|
event_class,
|
|
start_time,
|
|
estimated_restoration_time,
|
|
outage_geometries,
|
|
)
|
|
try:
|
|
existing_record.neighborhood = initial_post_result[
|
|
"neighborhood"
|
|
]
|
|
except KeyError:
|
|
pass
|
|
|
|
try:
|
|
existing_record.city = initial_post_result["city"]
|
|
except KeyError:
|
|
pass
|
|
|
|
existing_record.initial_post_id = initial_post_result["post_id"]
|
|
existing_record.most_recent_post_id = initial_post_result["post_id"]
|
|
existing_record.map_media_post_id = initial_post_result[
|
|
"map_media_post_id"
|
|
]
|
|
session.commit()
|
|
|
|
except NoResultFound:
|
|
print("Existing record not found")
|
|
post_id = None
|
|
map_media_post_id = None
|
|
neighborhood = None
|
|
city = None
|
|
if not event_class["is_postable"]:
|
|
print(
|
|
"Outage is {} considered postable, will not post".format(
|
|
event_class["size"]
|
|
)
|
|
)
|
|
else:
|
|
initial_post_result = do_initial_post(
|
|
event,
|
|
event_class,
|
|
start_time,
|
|
estimated_restoration_time,
|
|
outage_geometries,
|
|
)
|
|
post_id = initial_post_result["post_id"]
|
|
map_media_post_id = initial_post_result["map_media_post_id"]
|
|
|
|
try:
|
|
neighborhood = initial_post_result["neighborhood"]
|
|
except KeyError:
|
|
pass
|
|
|
|
try:
|
|
city = initial_post_result["city"]
|
|
except KeyError:
|
|
pass
|
|
|
|
new_outage_record = SclOutage(
|
|
scl_outage_id=event["id"],
|
|
outage_user_id=event["identifier"],
|
|
most_recent_post_id=post_id,
|
|
initial_post_id=post_id,
|
|
map_media_post_id=map_media_post_id,
|
|
last_updated_time=last_updated_time,
|
|
estimated_restoration_time=estimated_restoration_time,
|
|
cause=event["cause"],
|
|
status=status,
|
|
start_time=start_time,
|
|
num_people=event["numPeople"],
|
|
max_num_people=event["numPeople"],
|
|
neighborhood=neighborhood,
|
|
city=city,
|
|
)
|
|
session.add(new_outage_record)
|
|
session.commit()
|
|
|
|
lookup_active_outages_statement = select(SclOutage).where(
|
|
SclOutage.no_longer_in_response_time == None # noqa: E711 - Syntax must stay this way for SQLAlchemy
|
|
)
|
|
for active_outage in session.scalars(lookup_active_outages_statement):
|
|
if (
|
|
not any(event["id"] == active_outage.scl_outage_id for event in scl_events)
|
|
and scl_events
|
|
):
|
|
# Event ID no longer exists in response
|
|
if active_outage.most_recent_post_id:
|
|
try:
|
|
post_result = mastodon_client.status_post(
|
|
status="This outage is no longer in the SCL feed, which usually means it's either been resolved, or split into multiple smaller outages.\n\n#SeattleCityLightOutage #SCLOutage",
|
|
in_reply_to_id=active_outage.most_recent_post_id,
|
|
visibility="public",
|
|
language="en",
|
|
)
|
|
active_outage.most_recent_post_id = post_result["id"]
|
|
except mastodon.MastodonNotFoundError:
|
|
print(
|
|
"The outage post couldn't be replied to, it was externally deleted."
|
|
)
|
|
active_outage.no_longer_in_response_time = datetime.now()
|
|
session.commit()
|