Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into refine-actions
Browse files Browse the repository at this point in the history
  • Loading branch information
koppor committed Nov 30, 2023
2 parents 627acc0 + ea0ea78 commit ee6c952
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 40 deletions.
2 changes: 2 additions & 0 deletions in-vehicle-stack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ members = [
"scenarios/smart_trailer_use_case/applications/smart_trailer_application",
"scenarios/smart_trailer_use_case/digital_twin_providers/common",
"scenarios/smart_trailer_use_case/digital_twin_providers/trailer_connected_provider",
"scenarios/smart_trailer_use_case/digital_twin_providers/trailer_fridge_connected_provider",
"scenarios/smart_trailer_use_case/digital_twin_providers/trailer_fridge_provider",
"scenarios/smart_trailer_use_case/digital_twin_providers/trailer_properties_provider",
"scenarios/smart_trailer_use_case/proto_build",
]

[workspace.dependencies]
env_logger= "0.10.0"
futures = "0.3.7"
log = "0.4.20"
paho-mqtt = "0.12"
parking_lot = "0.12.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ interfaces = { path = "../../../../proto_build"}
paho-mqtt = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tonic = { workspace = true }
uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
futures = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use digital_twin_providers_common::utils::{
discover_digital_twin_provider_using_ibeji, discover_service_using_chariott, get_uri,
};
use env_logger::{Builder, Target};
use interfaces::invehicle_digital_twin::v1::EndpointInfo;
use interfaces::module::managed_subscribe::v1::managed_subscribe_client::ManagedSubscribeClient;
use interfaces::module::managed_subscribe::v1::{
Constraint, SubscriptionInfoRequest, SubscriptionInfoResponse,
Expand All @@ -29,6 +30,8 @@ use tokio::time::{sleep, Duration};
use tonic::{Request, Status};
use uuid::Uuid;

use futures::{future, FutureExt}; // 0.3.5

const FREQUENCY_MS_FLAG: &str = "freq_ms=";
const MQTT_CLIENT_ID: &str = "smart-trailer-consumer";

Expand Down Expand Up @@ -66,12 +69,31 @@ async fn get_trailer_weight_subscription_info(
Ok(response.into_inner())
}

async fn get_trailer_temp_subscription_info(
managed_subscribe_uri: &str,
constraints: Vec<Constraint>,
) -> Result<SubscriptionInfoResponse, Status> {
// Create gRPC client.
let mut client = ManagedSubscribeClient::connect(managed_subscribe_uri.to_string())
.await
.map_err(|err| Status::from_error(err.into()))?;

let request = Request::new(SubscriptionInfoRequest {
entity_id: trailer_v1::trailer::trailer_temperature::ID.to_string(),
constraints,
});

let response = client.get_subscription_info(request).await?;

Ok(response.into_inner())
}

/// Receive Trailer Weight updates.
///
/// # Arguments
/// * `broker_uri` - The broker URI.
/// * `topic` - The topic.
async fn receive_trailer_weight_updates(
async fn receive_trailer_info_updates(
broker_uri: &str,
topic: &str,
) -> Result<JoinHandle<()>, String> {
Expand Down Expand Up @@ -124,7 +146,7 @@ async fn receive_trailer_weight_updates(
for msg in receiver.iter() {
if let Some(msg) = msg {
// Here we log the message received. This could be expanded to parsing the message,
// Obtaining the weight and making decisions based on the weight
// Obtaining the updates and making decisions based on them
// For example, adjusting body functions or powertrain of the towing vehicle.
info!("{}", msg);
} else if !client.is_connected() {
Expand All @@ -150,6 +172,76 @@ async fn receive_trailer_weight_updates(
Ok(sub_handle)
}

async fn read_weight(provider_weight_endpoint_info: Option<EndpointInfo>, frequency_constraint: Constraint) -> Result<(), Box<dyn std::error::Error>> {
if provider_weight_endpoint_info.is_some() {
let managed_weight_subscribe_uri = provider_weight_endpoint_info.ok_or("Maximum amount of retries was reached while trying to retrieve the digital twin provider.")?.uri;
info!("The Managed Subscribe URI for the TrailerWeight property's provider is {managed_weight_subscribe_uri}");

// Get the subscription information for a managed topic with constraints.
let subscription_weight_info =
get_trailer_weight_subscription_info(&managed_weight_subscribe_uri, vec![frequency_constraint.clone()])
.await?;

// Deconstruct subscription information.
let broker_weight_uri = get_uri(&subscription_weight_info.uri)?;
let topic_weight = subscription_weight_info.context;
info!("The broker URI for the TrailerWeight property's provider is {broker_weight_uri}");

// Subscribe to topic.
let sub_handle_weight = receive_trailer_info_updates(&broker_weight_uri, &topic_weight)
.await
.map_err(|err| Status::internal(format!("{err:?}")))?;

signal::ctrl_c().await?;

info!("The Consumer has completed. Shutting down...");

// Wait for subscriber task to cleanly shutdown.
_ = sub_handle_weight.await;

} else {
info!("The Managed Subscribe URI for the TrailerWeight property's provider is not found");
}

Ok(())
}

async fn read_temperature (provider_temp_endpoint_info: Option<EndpointInfo>, frequency_constraint: Constraint) -> Result<(), Box<dyn std::error::Error>>{

if provider_temp_endpoint_info.is_some() {
let managed_temp_subscribe_uri = provider_temp_endpoint_info.ok_or("Maximum amount of retries was reached while trying to retrieve the digital twin provider.")?.uri;
info!("The Managed Subscribe URI for the TrailerTemp property's provider is {managed_temp_subscribe_uri}");

let subscription_temp_info =
get_trailer_temp_subscription_info(&managed_temp_subscribe_uri, vec![frequency_constraint.clone()])
.await?;

// Deconstruct subscription information.
let broker_temp_uri = get_uri(&subscription_temp_info.uri)?;

let topic_temp = subscription_temp_info.context;
info!("The broker URI for the TrailerTemp property's provider is {broker_temp_uri}");

// Subscribe to topic.
let sub_handle_temp = receive_trailer_info_updates(&broker_temp_uri, &topic_temp)
.await
.map_err(|err| Status::internal(format!("{err:?}")))?;

signal::ctrl_c().await?;

info!("The Consumer has completed. Shutting down...");

// Wait for subscriber task to cleanly shutdown.
_ = sub_handle_temp.await;

} else {
info!("The Managed Subscribe URI for the TrailerTemp property's provider is not found");
}

Ok(())
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup logging.
Expand Down Expand Up @@ -184,10 +276,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap_or_else(|| DEFAULT_FREQUENCY_MS.to_string());

// Retrieve the provider URI.
let mut provider_endpoint_info = None;
let mut provider_weight_endpoint_info = None;
let mut provider_temp_endpoint_info = None;
let mut retries: i32 = 0;
while provider_endpoint_info.is_none() {
provider_endpoint_info = match discover_digital_twin_provider_using_ibeji(
while provider_weight_endpoint_info.is_none() || provider_temp_endpoint_info.is_none() {
provider_weight_endpoint_info = match discover_digital_twin_provider_using_ibeji(
&invehicle_digital_twin_uri,
trailer_v1::trailer::trailer_weight::ID,
digital_twin_protocol::GRPC,
Expand All @@ -206,7 +299,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};

if provider_endpoint_info.is_none() && retries < MAX_RETRIES {
provider_temp_endpoint_info = match discover_digital_twin_provider_using_ibeji(
&invehicle_digital_twin_uri,
trailer_v1::trailer::trailer_temperature::ID,
digital_twin_protocol::GRPC,
&[digital_twin_operation::MANAGEDSUBSCRIBE.to_string()],
)
.await
{
Ok(response) => Some(response),
Err(status) => {
info!(
"A provider was not found in the digital twin service for id '{}' with: '{:?}'",
trailer_v1::trailer::trailer_temperature::ID,
status
);
None
}
};

if (provider_weight_endpoint_info.is_none() && provider_temp_endpoint_info.is_none()) && retries < MAX_RETRIES {
info!("Retrying FindById to retrieve the properties provider endpoint in {DURATION_BETWEEN_ATTEMPTS:?}.");
sleep(DURATION_BETWEEN_ATTEMPTS).await;
retries += 1;
Expand All @@ -215,36 +327,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

let managed_subscribe_uri = provider_endpoint_info.ok_or("Maximum amount of retries was reached while trying to retrieve the digital twin provider.")?.uri;
info!("The Managed Subscribe URI for the TrailerWeight property's provider is {managed_subscribe_uri}");

// Create constraint for the managed subscribe call.
let frequency_constraint = Constraint {
r#type: constraint_type::FREQUENCY_MS.to_string(),
value: frequency_ms.to_string(),
};

// Get the subscription information for a managed topic with constraints.
let subscription_info =
get_trailer_weight_subscription_info(&managed_subscribe_uri, vec![frequency_constraint])
.await?;

// Deconstruct subscription information.
let broker_uri = get_uri(&subscription_info.uri)?;
let topic = subscription_info.context;
info!("The broker URI for the TrailerWeight property's provider is {broker_uri}");

// Subscribe to topic.
let sub_handle = receive_trailer_weight_updates(&broker_uri, &topic)
.await
.map_err(|err| Status::internal(format!("{err:?}")))?;

signal::ctrl_c().await?;

info!("The Consumer has completed. Shutting down...");

// Wait for subscriber task to cleanly shutdown.
_ = sub_handle.await;
let futures = vec![
read_weight(provider_weight_endpoint_info, frequency_constraint.clone()).boxed(),
read_temperature(provider_temp_endpoint_info, frequency_constraint.clone()).boxed()];
let _results = future::join_all(futures).await;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ RUN sanitized=$(echo "${APP_NAME}" | tr -dc '^[a-zA-Z_0-9-]+$'); \
}

# Build the application
RUN --mount=type=cache,sharing=private,target=./target/release/ cargo build --release --bin "${APP_NAME}" && \
RUN --mount=type=cache,target=./target/release/ cargo build --release --bin "${APP_NAME}" && \
cp ./target/release/"${APP_NAME}" /sdv/service

################################################################################
Expand Down Expand Up @@ -69,5 +69,7 @@ COPY --from=build /sdv/service /sdv/
# Expose the port that the application listens on.
EXPOSE 4020

ENV TRAILER_TYPE=2

# What the container should run when it is started.
CMD ["/sdv/service"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! Module containing gRPC service implementation based on [`interfaces::digital_twin_get_provider.proto`].
//!
//! Provides a gRPC endpoint for getting if the trailer is connected
use log::debug;
use log::info;
use smart_trailer_interfaces::digital_twin_get_provider::v1::digital_twin_get_provider_server::DigitalTwinGetProvider;
use smart_trailer_interfaces::digital_twin_get_provider::v1::{GetRequest, GetResponse};
use tonic::{Request, Response, Status};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "trailer_fridge_connected_provider"
version = "0.1.0"
edition = "2021"
license = "MIT"

[dependencies]
digital-twin-model = { path = "../../digital-twin-model" }
digital_twin_providers_common = { path = "../common" }
env_logger = { workspace = true }
interfaces = { path = "../../../../proto_build"}
log = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
smart_trailer_interfaces = { path = "../../proto_build" }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tonic = { workspace = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
# SPDX-License-Identifier: MIT

# Comments are provided throughout this file to help you get started.
# If you need more help, visit the Dockerfile reference guide at
# https://docs.docker.com/engine/reference/builder/

################################################################################
# Create a stage for building the application.

ARG RUST_VERSION=1.72.1
FROM docker.io/library/rust:${RUST_VERSION}-slim-bullseye AS build
ARG APP_NAME=trailer_fridge_connected_provider
# Add Build dependencies.
RUN apt update -y && apt upgrade -y && apt install -y \
cmake \
libssl-dev \
pkg-config \
protobuf-compiler

WORKDIR /sdv

COPY ./ .


# Check that APP_NAME argument is valid.
RUN sanitized=$(echo "${APP_NAME}" | tr -dc '^[a-zA-Z_0-9-]+$'); \
[ "$sanitized" = "${APP_NAME}" ] || { \
echo "ARG 'APP_NAME' is invalid. APP_NAME='${APP_NAME}' sanitized='${sanitized}'"; \
exit 1; \
}

# Build the application
RUN --mount=type=cache,target=./target/release/ cargo build --release --bin "${APP_NAME}" && \
cp ./target/release/"${APP_NAME}" /sdv/service

################################################################################
# Create a new stage for running the application that contains the minimal
# runtime dependencies for the application. This often uses a different base
# image from the build stage where the necessary files are copied from the build
# stage.
#
# The example below uses the debian bullseye image as the foundation for running the app.
# By specifying the "bullseye-slim" tag, it will also use whatever happens to be the
# most recent version of that tag when you build your Dockerfile. If
# reproducability is important, consider using a digest
# (e.g., debian@sha256:ac707220fbd7b67fc19b112cee8170b41a9e97f703f588b2cdbbcdcecdd8af57).
FROM docker.io/library/debian:bullseye-slim AS final

# Create a non-privileged user that the app will run under.
# See https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#user
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
USER appuser

WORKDIR /sdv

# Copy the executable from the "build" stage.
COPY --from=build /sdv/service /sdv/

# Expose the port that the application listens on.
EXPOSE 4020

# What the container should run when it is started.
CMD ["/sdv/service"]
Loading

0 comments on commit ee6c952

Please sign in to comment.