Skip to content

Commit a9e54fe

Browse files
author
Michael Zechmair
committed
IBA
1 parent 74cc8cf commit a9e54fe

File tree

5 files changed

+188
-0
lines changed

5 files changed

+188
-0
lines changed

hbp_nrp_cle/hbp_nrp_cle/cle/DeterministicClosedLoopEngine.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(self,
6060
brain_control_adapter,
6161
brain_comm_adapter,
6262
transfer_function_manager,
63+
external_module_array,
6364
dt
6465
):
6566
"""
@@ -78,6 +79,7 @@ def __init__(self,
7879
self.bca = brain_control_adapter
7980
self.bcm = brain_comm_adapter
8081
self.tfm = transfer_function_manager
82+
self.ema = external_module_array
8183
# default timestep
8284
self.timestep = dt
8385

@@ -130,6 +132,7 @@ def initialize(self, brain_file=None, **configuration):
130132
self.rca.initialize()
131133
self.bca.initialize()
132134
self.tfm.initialize('tfnode')
135+
self.ema.initialize()
133136
cle.clock = 0.0
134137
self.start_time = 0.0
135138
self.elapsed_time = 0.0
@@ -229,6 +232,8 @@ def run_step(self, timestep):
229232
# self.tfm.run_neuron_to_robot(clk)
230233
self.tfm.run_tfs(clk)
231234

235+
self.ema.run_step()
236+
232237
# update clock
233238
cle.clock += timestep
234239

@@ -245,6 +250,7 @@ def shutdown(self):
245250
self.bcm.shutdown()
246251
self.rca.shutdown()
247252
self.bca.shutdown()
253+
self.ema.shutdown()
248254

249255
def start(self):
250256
"""
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import print_function
2+
3+
import rospy
4+
from concurrent.futures import ThreadPoolExecutor
5+
6+
class AsyncServiceProxy(object):
7+
8+
def __init__(self, service_name, service_type, persistent=True,
9+
headers=None, callback=None):
10+
"""Create an asynchronous service proxy."""
11+
12+
self.executor = ThreadPoolExecutor(max_workers=1)
13+
self.service_proxy = rospy.ServiceProxy(
14+
service_name,
15+
service_type,
16+
persistent,
17+
headers)
18+
self.callback = callback
19+
20+
def __call__(self, *args, **kwargs):
21+
"""Get a Future corresponding to a call of this service."""
22+
23+
fut = self.executor.submit(self.service_proxy.call, *args, **kwargs)
24+
if self.callback is not None:
25+
fut.add_done_callback(self.callback)
26+
27+
return fut
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
ExternalModule.py includes the corresponding CLE class for external ROS modules.
3+
"""
4+
5+
__author__ = 'Omer Yilmaz'
6+
7+
import os
8+
import threading
9+
from multiprocessing import Value
10+
import logging
11+
import rospy
12+
from hbp_nrp_cle.externalsim.AsyncEmaCall import AsyncServiceProxy
13+
from cle_ros_msgs.srv import Initialize, RunStep, RunStepRequest, Shutdown
14+
15+
logger = logging.getLogger('hbp_nrp_cle')
16+
17+
18+
class ExternalModule(object):
19+
"""
20+
External ROS modules have initialize, run_step and shutdown methods.
21+
This class has the corresponding initialize, run_step and shutdown
22+
methods which triggers the external ones through ROS service proxies.
23+
Objects of this class is synchronized with the Deterministic Closed
24+
Loop Engine via ExternalModuleManager.
25+
"""
26+
27+
def __init__(self, module_name):
28+
self.service_name = 'emi/' + module_name + '_module/'
29+
self.resp = None
30+
31+
rospy.wait_for_service(self.service_name + 'initialize')
32+
self.initialize_proxy = AsyncServiceProxy(
33+
self.service_name + 'initialize', Initialize, persistent=False)
34+
35+
rospy.wait_for_service(self.service_name + 'run_step')
36+
self.run_step_proxy = AsyncServiceProxy(
37+
self.service_name + 'run_step', RunStep, persistent=True)
38+
39+
rospy.wait_for_service(self.service_name + 'shutdown')
40+
self.shutdown_proxy = AsyncServiceProxy(
41+
self.service_name + 'shutdown', Shutdown, persistent=False)
42+
43+
def initialize(self):
44+
"""
45+
This method triggers the initialize method served at the external module synchronously
46+
with the CLE.
47+
"""
48+
try:
49+
self.resp = self.initialize_proxy()
50+
return self.resp
51+
except rospy.ServiceException as e:
52+
logger.exception(self.service_name + 'initialize call failed: %s' % e)
53+
54+
def run_step(self):
55+
"""
56+
This method triggers the run_step method served at the external module synchronously
57+
with the CLE.
58+
"""
59+
try:
60+
fut = self.run_step_proxy()
61+
return fut
62+
except rospy.ServiceException as e:
63+
logger.exception(self.service_name + 'run_step call failed: %s' % e)
64+
65+
def shutdown(self):
66+
"""
67+
This method triggers the shutdown method served at the external module synchronously
68+
with the CLE.
69+
"""
70+
try:
71+
fut = self.shutdown_proxy()
72+
return fut
73+
except rospy.ServiceException as e:
74+
logger.exception(self.service_name + 'shutdown call failed: %s' % e)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""
2+
The manager for the external modules which extend the NRP through ROS launch
3+
mechanism.
4+
"""
5+
6+
__author__ = 'Omer Yilmaz'
7+
8+
import concurrent.futures
9+
from multiprocessing import Process, Pool, cpu_count
10+
import re
11+
import rosservice
12+
from hbp_nrp_cle.externalsim.ExternalModule import ExternalModule
13+
14+
15+
class ExternalModuleManager(object):
16+
"""
17+
This class automatically detects the external modules searching the ROS
18+
services available at the ROS server. It keeps and array of the external
19+
modules and calls initialize, run_step ans shutdown methods for each
20+
external module. One object of this class is used by the Deterministic
21+
Closed Loop Engine and is synchronized with it making every external module
22+
on the array also synchronized.
23+
"""
24+
25+
def __init__(self):
26+
self.module_names = []
27+
for service in rosservice.get_service_list():
28+
m = re.match(r"/emi/.*/initialize", str(service))
29+
if m:
30+
module_name = m.group(0)[5:-18]
31+
self.module_names.append(module_name)
32+
33+
self.ema = []
34+
if len(self.module_names) is not 0:
35+
with concurrent.futures.ThreadPoolExecutor(max_workers=max(len(self.module_names), 1)) as executor:
36+
future_results = [executor.submit(ExternalModule, x) for x in self.module_names]
37+
concurrent.futures.wait(future_results)
38+
for future in future_results:
39+
self.ema.append(future.result())
40+
41+
42+
def initialize(self):
43+
"""
44+
This method is used to run all initialize methods served at each external models at once.
45+
"""
46+
if len(self.module_names) is not 0:
47+
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.ema)) as executor:
48+
future_results = [executor.submit(x.initialize) for x in self.ema]
49+
concurrent.futures.wait(future_results)
50+
for future in future_results:
51+
while not future.result().done():
52+
pass
53+
54+
def run_step(self):
55+
"""
56+
This method is used to run all run_step methods served at each external models at once.
57+
"""
58+
if len(self.module_names) is not 0:
59+
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.ema)) as executor:
60+
future_results = [executor.submit(x.run_step) for x in self.ema]
61+
concurrent.futures.wait(future_results)
62+
for future in future_results:
63+
while not future.result().done():
64+
pass
65+
66+
def shutdown(self):
67+
"""
68+
This method is used to run all shutdown methods served at each external models at once.
69+
"""
70+
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.ema)) as executor:
71+
future_results = [executor.submit(x.shutdown) for x in self.ema]
72+
concurrent.futures.wait(future_results)
73+
for future in future_results:
74+
while not future.result().done():
75+
pass
76+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""
2+
CLE part of the External Ros Modules API version 2.0.0
3+
"""
4+
5+
__author__ = 'Omer Yilmaz'

0 commit comments

Comments
 (0)