1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#[allow(unused_imports, reason = "services")]
use crate::{service::*, *};

pub(crate) struct Process {
  #[allow(dead_code, reason = "process")]
  config: config::Manager,

  #[allow(dead_code, reason = "process")]
  services: service::Container,
}

impl Process {
  pub(crate) fn new(
    config: config::Manager,
    services: service::Container,
  ) -> Self {
    Self { config, services }
  }
}

impl super::Process for Process {}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct PidgeonHealth {
  temperature: f32,
}

#[async_trait::async_trait]
impl process::Recurring for Process {
  async fn execute(&self) -> anyhow::Result<()> {
    let config = self.config.values().await;

    let last_pushed_id =
      match self.services.db().get_last_successful_update_log().await? {
        Some(db::Log {
          last: Some(last), ..
        }) => last,
        _ => 0,
      };

    let mut health_to_update = self
      .services
      .db()
      .get_health(last_pushed_id, config.cloud.message_limit)
      .await?;
    let health_len = health_to_update.len();

    let last_push_id =
      match health_to_update.iter().max_by(|x, y| x.id.cmp(&y.id)) {
        Some(health) => health.id,
        None => return Ok(()),
      };

    let result = self
      .services
      .cloud()
      .update(
        serde_json::Value::Null,
        health_to_update
          .drain(0..)
          .map(|health| cloud::Health {
            device_id: health.source,
            timestamp: health.timestamp,
            data: serde_json::json!(health.data),
          })
          .collect(),
      )
      .await;

    let (log_status, log_response) = match result {
      Ok(cloud::Response {
        success: true,
        text,
        ..
      }) => {
        tracing::info!(
          "Successfully updated {:?} health from {:?} to {:?}",
          health_len,
          last_pushed_id,
          last_push_id
        );
        (db::LogStatus::Success, text)
      }
      Ok(cloud::Response {
        success: false,
        text,
        code,
      }) => {
        tracing::error!(
          "Failed updating {:?} health from {:?} to {:?} with code {:?}",
          health_len,
          last_pushed_id,
          last_push_id,
          code
        );
        (db::LogStatus::Failure, text)
      }
      Err(error) => {
        tracing::error!(
          "Failed pushing {:?} health from {:?} to {:?} {}",
          health_len,
          last_pushed_id,
          last_push_id,
          error
        );
        (db::LogStatus::Failure, error.to_string())
      }
    };
    let log = db::Log {
      id: 0,
      timestamp: chrono::Utc::now(),
      last: Some(last_push_id),
      status: log_status,
      kind: db::LogKind::Update,
      response: serde_json::Value::String(log_response),
    };
    self.services.db().insert_log(log).await?;

    Ok(())
  }
}