seattlecitylight-mastodon-bot/scl.py

180 lines
7.1 KiB
Python
Raw Normal View History

2024-01-13 14:24:00 -08:00
from datetime import datetime
2024-01-13 15:01:11 -08:00
from typing import Optional
2024-01-13 14:24:00 -08:00
import requests
from mastodon import Mastodon
2024-01-13 15:01:11 -08:00
from sqlalchemy import create_engine, select
2024-01-13 14:24:00 -08:00
from sqlalchemy.exc import NoResultFound
2024-01-13 15:01:11 -08:00
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
2024-01-13 14:24:00 -08:00
post_datetime_format = "%b %e %l:%M %p"
scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events"
scl_events_response = requests.get(scl_events_url)
try:
scl_events = scl_events_response.json()
except requests.JSONDecodeError:
print("JSON could not be loaded from SCL API")
raise
mastodon = Mastodon(access_token="scl_bot_mastodon.secret")
class Base(DeclarativeBase):
pass
class SclOutage(Base):
__tablename__ = "scl_outages"
scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True)
outage_user_id: Mapped[str] = mapped_column()
2024-01-13 15:00:05 -08:00
most_recent_post_id: Mapped[Optional[str]] = mapped_column()
2024-01-13 14:24:00 -08:00
last_updated_time: Mapped[datetime] = mapped_column()
estimated_restoration_time: Mapped[datetime] = mapped_column()
cause: Mapped[str] = mapped_column()
outage_size: Mapped[str] = mapped_column()
status: Mapped[Optional[str]] = mapped_column()
2024-01-13 15:00:05 -08:00
no_longer_in_response_time: Mapped[Optional[datetime]] = mapped_column()
2024-01-13 14:24:00 -08:00
def __repr__(self) -> str:
2024-01-13 14:35:08 -08:00
return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r})"
2024-01-13 14:24:00 -08:00
engine = create_engine("sqlite:///scl.db")
Base.metadata.create_all(engine)
with Session(engine) as session:
for event in scl_events:
print("Processing outage with Internal ID {}".format(event["id"]))
start_time = datetime.fromtimestamp(event["startTime"] / 1000)
last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000)
estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000)
lookup_statement = select(SclOutage).where(
SclOutage.scl_outage_id == event["id"]
)
lookup_result = session.scalars(lookup_statement)
if event["numPeople"] < 250:
outage_size = "Small"
elif event["numPeople"] < 1000:
outage_size = "Medium"
else:
outage_size = "Large"
if "status" in event:
status = event["status"]
else:
status = None
hashtag_string = "#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format(
event["identifier"]
)
try:
existing_record = lookup_result.one()
updated_properties = []
updated_entries = []
if estimated_restoration_time != existing_record.estimated_restoration_time:
existing_record.estimated_restoration_time = estimated_restoration_time
updated_properties.append("estimated restoration")
updated_entries.append(
"Est. Restoration: {}".format(
estimated_restoration_time.strftime(post_datetime_format)
)
)
if event["cause"] != existing_record.cause:
existing_record.cause = event["cause"]
updated_properties.append("cause")
updated_entries.append("Cause: {}".format(event["cause"]))
if outage_size != existing_record.outage_size:
existing_record.outage_size = outage_size
updated_properties.append("outage size")
updated_entries.append("Outage Size: {}".format(outage_size))
if status != existing_record.status:
existing_record.status = status
updated_properties.append("status")
updated_entries.append("Status: {}".format(status))
if updated_properties:
updated_properties.sort()
updated_entries.sort()
if len(updated_properties) == 1:
updated_entries.insert(
0,
"The {} of this outage has been updated.\n".format(
updated_properties[0]
),
)
else:
# TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence
updated_entries.insert(
0,
"The {} of this outage have been updated.\n".format(
", ".join(updated_properties)
),
)
updated_entries.append("")
updated_entries.append(hashtag_string)
mastodon_post_result = mastodon.status_post(
status="\n".join(updated_entries),
in_reply_to_id=existing_record.most_recent_post_id,
visibility="public",
2024-01-13 14:24:00 -08:00
)
existing_record.most_recent_post_id = mastodon_post_result["id"]
session.commit()
except NoResultFound:
# TODO: Logic to post initial to Mastodon,
print("Existing record not found")
post_text = """Seattle City Light is reporting a {} outage in {}.
Start Date: {}
Est. Restoration: {}
Cause: {}
{}""".format(
outage_size.lower(),
event["city"],
start_time.strftime(post_datetime_format),
estimated_restoration_time.strftime(post_datetime_format),
event["cause"],
hashtag_string,
)
mastodon_post_result = mastodon.status_post(
status=post_text,
visibility="public",
)
new_outage_record = SclOutage(
scl_outage_id=event["id"],
outage_user_id=event["identifier"],
most_recent_post_id=mastodon_post_result["id"],
last_updated_time=last_updated_time,
estimated_restoration_time=estimated_restoration_time,
cause=event["cause"],
status=status,
outage_size=outage_size,
)
session.add(new_outage_record)
session.commit()
lookup_active_outages_statement = select(SclOutage).where(
SclOutage.no_longer_in_response_time == None
)
for active_outage in session.scalars(lookup_active_outages_statement):
if not any(event["id"] == active_outage.scl_outage_id for event in scl_events):
# Event ID no longer exists in response
mastodon_post_result = mastodon.status_post(
status="This outage is reported to be resolved.\n\n#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format(
active_outage.outage_user_id
),
in_reply_to_id=active_outage.most_recent_post_id,
visibility="public",
2024-01-13 14:24:00 -08:00
)
active_outage.most_recent_post_id = mastodon_post_result["id"]
active_outage.no_longer_in_response_time = datetime.now()
session.commit()