This repository has been archived by the owner on Mar 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflask_rclpy.py
65 lines (51 loc) · 1.66 KB
/
flask_rclpy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import rclpy
import signal
from rclpy.node import Node
from std_msgs.msg import String
import threading
from flask import Flask
class TestPublisher(Node):
def __init__(self):
super().__init__('test_publisher')
self.publisher = self.create_publisher(String, 'flask_pub_topic', 10)
self.subscription = self.create_subscription(
String,
'/topic',
self.chatter_callback,
10)
self.latest_message = None
def chatter_callback(self, msg):
print(f'chatter cb received: {msg.data}')
self.latest_message = msg.data
def publish_message(self):
msg = String()
msg.data = 'hello, world!'
self.publisher.publish(msg)
def ros2_thread(node):
print('entering ros2 thread')
rclpy.spin(node)
print('leaving ros2 thread')
def sigint_handler(signal, frame):
"""
SIGINT handler
We have to know when to tell rclpy to shut down, because
it's in a child thread which would stall the main thread
shutdown sequence. So we use this handler to call
rclpy.shutdown() and then call the previously-installed
SIGINT handler for Flask
"""
rclpy.shutdown()
if prev_sigint_handler is not None:
prev_sigint_handler(signal)
rclpy.init(args=None)
ros2_node = TestPublisher()
app = Flask(__name__)
threading.Thread(target=ros2_thread, args=[ros2_node]).start()
prev_sigint_handler = signal.signal(signal.SIGINT, sigint_handler)
@app.route('/latest_message')
def get_current_time():
return {'message': ros2_node.latest_message}
@app.route('/publish_message')
def get_publish_message():
ros2_node.publish_message()
return {}