mod appointment; mod bot; mod db; mod error; mod schema; use std::time::Duration; use std::{env, fs::File, io::BufReader, sync::Arc}; use anyhow::Result; use async_mutex::Mutex; use bot::fetch_and_announce_appointment; use chrono::{DateTime, NaiveDateTime, NaiveTime, TimeZone, Utc}; use chrono_tz::Europe; use db::ChatInfo; use diesel::result::Error::NotFound; use diesel::{Connection, RunQueryDsl, SqliteConnection}; use diesel::{ExpressionMethods, QueryDsl}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use error::ConfigLoadError; use log::*; use serde::{de::Error, Deserialize, Deserializer}; use teloxide::adaptors::Throttle; use teloxide::prelude::RequesterExt; use teloxide::requests::Requester; use teloxide::types::ChatId; use teloxide::{adaptors::throttle::Limits, Bot}; use tokio::time::sleep; use crate::db::DbChat; #[macro_use] extern crate rust_i18n; i18n!("locales"); #[derive(Deserialize, Debug)] pub struct Config { token: String, data_path: String, poll_interval: u64, #[serde(deserialize_with = "deserialize_time")] reminder_time: NaiveTime, } fn deserialize_time<'de, D: Deserializer<'de>>(deserializer: D) -> Result { let s: String = Deserialize::deserialize(deserializer)?; NaiveTime::parse_from_str(&s, "%H:%M").map_err(D::Error::custom) } impl Config { fn load() -> Result { let env_var_name = "CALENDAR_BOT_CONFIG_FILE"; let default_filename = "./config.yaml"; let path = env::var(env_var_name).unwrap_or_else(|_| { warn!( "Cannot read env var '{}', assuming '{}'", env_var_name, default_filename ); default_filename.to_owned() }); info!("Reading configuration from {}", path); let file = File::open(path).map_err(ConfigLoadError::OpenFailed)?; let reader = BufReader::new(file); serde_yaml::from_reader(reader).map_err(ConfigLoadError::ReadError) } } const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); pub type Database = Arc>; #[tokio::main(flavor = "current_thread")] async fn main() { pretty_env_logger::init(); let config = Config::load().unwrap(); info!("Connecting to database {}", config.data_path); let mut db = SqliteConnection::establish(&config.data_path).unwrap(); db.run_pending_migrations(MIGRATIONS).unwrap(); let db = Arc::new(Mutex::new(db)); let bot = Bot::new(config.token).throttle(Limits::default()); { let db = db.clone(); let bot = bot.clone(); let poll_duration = Duration::from_secs(config.poll_interval); tokio::task::spawn(async move { loop { let now = Utc::now(); let next_appointment = db.lock().await.transaction(|db| { use schema::chat::dsl::*; chat.select(next_appointment_start) .filter(next_appointment_start.is_not_null()) .filter(next_appointment_start.gt(now.timestamp())) .order(next_appointment_start.asc()) .first::>(db) }); let next_appointment = match next_appointment { Err(NotFound) => None, Ok(appointment) => appointment, Err(e) => Err(e).unwrap(), }; let sleep_duration = next_appointment .map(|timestamp| NaiveDateTime::from_timestamp(timestamp, 0)) .map(|naive_date_time| DateTime::::from_utc(naive_date_time, Utc)) .map(|date_time| date_time - now) .map(|duration| duration.to_std().unwrap()) .filter(|duration| *duration < poll_duration) .unwrap_or(poll_duration); sleep(sleep_duration).await; // TODO Log the error and continue instead check_task(&bot, config.reminder_time, &db).await.unwrap(); } }); } tokio::time::sleep(Duration::from_secs(10)).await; bot::spawn(bot, db).await; } struct Reminder { time: DateTime, text: String, } async fn check_task(bot: &Throttle, reminder_time: NaiveTime, db: &Database) -> Result<()> { let chats = db.lock().await.transaction(|db| { use schema::chat::dsl::*; chat.load::(db) })?; let now = Utc::now().with_timezone(&Europe::Berlin); let today = now.date_naive(); for chat in chats { let mut chat_info = ChatInfo::from(chat); fetch_and_announce_appointment(bot, &mut chat_info, db).await?; let appointment = match chat_info.next_appointment { Some(appointment) => appointment, None => continue, }; let appointment = appointment.with_timezone(&Europe::Berlin); if appointment.start.date_naive() != today { continue; } let mut reminder = None; if now >= appointment.start { reminder = Some(Reminder { time: appointment.start, text: t!("messages.starting_now", locale = &chat_info.locale), }); } else { let reminder_date_time = now.date().and_time(reminder_time).unwrap(); if now >= reminder_date_time { reminder = Some(Reminder { time: reminder_date_time, text: t!( "messages.appointment_today", locale = &chat_info.locale, start_time = &appointment.start.format("%H:%M").to_string() ), }) } } if let Some(reminder) = reminder { if chat_info.last_reminder.is_some() && chat_info.last_reminder.unwrap() >= reminder.time { continue; } bot.send_message(ChatId(chat_info.id), reminder.text) .await?; db.lock().await.transaction(|db| { use schema::chat::dsl::*; diesel::update(chat.filter(telegram_id.eq(chat_info.id))) .set((last_reminder.eq(now.timestamp()),)) .execute(db) })?; } } Ok(()) }