from datetime import datetime import requests from bs4 import BeautifulSoup from mastodon import Mastodon import sqlite3 from typing import List from typing import Optional from sqlalchemy import ForeignKey, select from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import Session from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship from sqlalchemy.exc import NoResultFound from sqlalchemy import create_engine post_datetime_format = "%b %e %l:%M %p" scl_events_url = "https://utilisocial.io/datacapable/v2/p/scl/map/events" scl_events_response = requests.get(scl_events_url) try: scl_events = scl_events_response.json() except requests.JSONDecodeError: print("JSON could not be loaded from SCL API") raise mastodon = Mastodon(access_token="scl_bot_mastodon.secret") class Base(DeclarativeBase): pass class SclOutage(Base): __tablename__ = "scl_outages" scl_outage_id: Mapped[int] = mapped_column(primary_key=True, unique=True) outage_user_id: Mapped[str] = mapped_column() most_recent_post_id: Mapped[str] = mapped_column() last_updated_time: Mapped[datetime] = mapped_column() estimated_restoration_time: Mapped[datetime] = mapped_column() cause: Mapped[str] = mapped_column() outage_size: Mapped[str] = mapped_column() status: Mapped[Optional[str]] = mapped_column() no_longer_in_response_time: Mapped[ Optional[datetime] ] = ( mapped_column() ) # If the event is no longer being returned in the response, this will be set to the current time def __repr__(self) -> str: return f"SclOutage(scl_outage_id={self.scl_outage_id!r}, most_recent_post_id={self.most_recent_post_id!r}, last_updated_time={self.last_updated_time!r}, no_longer_in_response_time={self.no_longer_in_response_time!r})" engine = create_engine("sqlite:///scl.db") Base.metadata.create_all(engine) with Session(engine) as session: for event in scl_events: print("Processing outage with Internal ID {}".format(event["id"])) start_time = datetime.fromtimestamp(event["startTime"] / 1000) last_updated_time = datetime.fromtimestamp(event["lastUpdatedTime"] / 1000) estimated_restoration_time = datetime.fromtimestamp(event["etrTime"] / 1000) lookup_statement = select(SclOutage).where( SclOutage.scl_outage_id == event["id"] ) lookup_result = session.scalars(lookup_statement) if event["numPeople"] < 250: outage_size = "Small" elif event["numPeople"] < 1000: outage_size = "Medium" else: outage_size = "Large" if "status" in event: status = event["status"] else: status = None hashtag_string = "#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format( event["identifier"] ) try: existing_record = lookup_result.one() updated_properties = [] updated_entries = [] if estimated_restoration_time != existing_record.estimated_restoration_time: existing_record.estimated_restoration_time = estimated_restoration_time updated_properties.append("estimated restoration") updated_entries.append( "Est. Restoration: {}".format( estimated_restoration_time.strftime(post_datetime_format) ) ) if event["cause"] != existing_record.cause: existing_record.cause = event["cause"] updated_properties.append("cause") updated_entries.append("Cause: {}".format(event["cause"])) if outage_size != existing_record.outage_size: existing_record.outage_size = outage_size updated_properties.append("outage size") updated_entries.append("Outage Size: {}".format(outage_size)) if status != existing_record.status: existing_record.status = status updated_properties.append("status") updated_entries.append("Status: {}".format(status)) if updated_properties: updated_properties.sort() updated_entries.sort() if len(updated_properties) == 1: updated_entries.insert( 0, "The {} of this outage has been updated.\n".format( updated_properties[0] ), ) else: # TODO: this currently just smashes all of the properties together with commas, it'd be nice to make it actually format it like a sentence updated_entries.insert( 0, "The {} of this outage have been updated.\n".format( ", ".join(updated_properties) ), ) updated_entries.append("") updated_entries.append(hashtag_string) mastodon_post_result = mastodon.status_post( status="\n".join(updated_entries), in_reply_to_id=existing_record.most_recent_post_id, visibility="public", ) existing_record.most_recent_post_id = mastodon_post_result["id"] session.commit() except NoResultFound: # TODO: Logic to post initial to Mastodon, print("Existing record not found") post_text = """Seattle City Light is reporting a {} outage in {}. Start Date: {} Est. Restoration: {} Cause: {} {}""".format( outage_size.lower(), event["city"], start_time.strftime(post_datetime_format), estimated_restoration_time.strftime(post_datetime_format), event["cause"], hashtag_string, ) mastodon_post_result = mastodon.status_post( status=post_text, visibility="public", ) new_outage_record = SclOutage( scl_outage_id=event["id"], outage_user_id=event["identifier"], most_recent_post_id=mastodon_post_result["id"], last_updated_time=last_updated_time, estimated_restoration_time=estimated_restoration_time, cause=event["cause"], status=status, outage_size=outage_size, ) session.add(new_outage_record) session.commit() lookup_active_outages_statement = select(SclOutage).where( SclOutage.no_longer_in_response_time == None ) for active_outage in session.scalars(lookup_active_outages_statement): if not any(event["id"] == active_outage.scl_outage_id for event in scl_events): # Event ID no longer exists in response mastodon_post_result = mastodon.status_post( status="This outage is reported to be resolved.\n\n#SeattleCityLightOutage #SCLOutage #SCLOutage{}".format( active_outage.outage_user_id ), in_reply_to_id=active_outage.most_recent_post_id, visibility="public", ) active_outage.most_recent_post_id = mastodon_post_result["id"] active_outage.no_longer_in_response_time = datetime.now() session.commit()