1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use futures::future::join_all;

use modbus::Time;

#[allow(unused_imports, reason = "services")]
use crate::{service::*, *};

pub(crate) struct Process {
  #[allow(dead_code, reason = "process")]
  config: config::Manager,

  #[allow(dead_code, reason = "process")]
  services: service::Container,
}

impl Process {
  pub(crate) fn new(
    config: config::Manager,
    services: service::Container,
  ) -> Self {
    Self { config, services }
  }
}

impl super::Process for Process {}

#[async_trait::async_trait]
impl process::Recurring for Process {
  #[tracing::instrument(skip(self))]
  async fn execute(&self) -> anyhow::Result<()> {
    let config = self.config.values().await;
    let timeout = config.modbus.time_timeout;

    let db_devices = self.services.db().get_devices().await?;

    join_all(
      db_devices
        .into_iter()
        .filter_map(|device| {
          config
            .modbus
            .devices
            .values()
            .filter(|device_config| device_config.time.is_some())
            .find(|device_config| device_config.kind == device.kind)
            .map(|config| Device {
              id: device.id,
              #[allow(clippy::unwrap_used, reason = "filtered by is_some")]
              time: config.time.unwrap(),
            })
        })
        .map(|device| async move {
          match self.write_to_device(&device, timeout).await {
            Err(error) => {
              tracing::error! {
                %error,
                "Failed writing nightly to device {}",
                &device.id
              }
            }
            Ok(_) => {
              tracing::info! {
                "Wrote nightly to device {}",
                &device.id
              }
            }
          }
        }),
    )
    .await;

    Ok(())
  }
}

#[derive(Debug, thiserror::Error)]
enum TimeWriteError {
  #[error("Failed writing to device")]
  DeviceWrite(#[from] modbus::DeviceWriteError),

  #[error("Writing to device timed out")]
  Timeout(#[from] std::io::Error),
}

impl Process {
  async fn write_to_device(
    &self,
    device: &Device,
    timeout: chrono::Duration,
  ) -> Result<(), TimeWriteError> {
    self
      .services
      .modbus()
      .write_to_id(
        &device.id,
        [Box::new(
          modbus::time::implementation_for(device.time).create(),
        )],
      )
      .timeout(timeout_from_chrono(timeout))
      .await??;

    Ok(())
  }
}

#[derive(Clone, Debug)]
struct Device {
  id: String,
  time: modbus::TimeImplementation,
}

fn timeout_from_chrono(
  timeout: chrono::Duration,
) -> futures_time::time::Duration {
  futures_time::time::Duration::from_millis(timeout.num_milliseconds() as u64)
}