#!/usr/bin/env python
import rospy
import moveit_commander
from sensor_msgs.msg import JointState
import csv
import time
class UR10eMoverRecorder:
def __init__(self, duration=60.0, rate_hz=10, output_file="/tmp/ur10e_joint_log.csv"):
# Duration and logging config
self.duration = duration
self.rate_hz = rate_hz
self.output_file = output_file
# Data containers
self.joint_names = []
self.joint_data = []
# Initialize ROS
rospy.init_node("ur10e_move_and_record", anonymous=True)
moveit_commander.roscpp_initialize([])
# MoveIt interfaces
self.robot = moveit_commander.RobotCommander()
self.scene = moveit_commander.PlanningSceneInterface()
self.move_group = moveit_commander.MoveGroupCommander("manipulator")
# Subscribe to joint states
rospy.Subscriber("/joint_states", JointState, self.joint_state_callback)
def joint_state_callback(self, msg):
if not self.joint_names:
self.joint_names = list(msg.name)
positions = dict(zip(msg.name, msg.position))
timestamp = time.time()
row = [timestamp] + [positions.get(name, 0.0) for name in self.joint_names]
self.joint_data.append(row)
def move_and_record(self):
rospy.loginfo("Starting UR10e movement and logging...")
start_time = time.time()
rate = rospy.Rate(self.rate_hz)
while not rospy.is_shutdown() and (time.time() - start_time < self.duration):
# Plan and execute a random target
self.move_group.set_random_target()
plan = self.move_group.plan()
if plan and len(plan.joint_trajectory.points) > 0:
self.move_group.execute(plan, wait=True)
else:
rospy.logwarn("Planning failed, skipping this cycle.")
rospy.sleep(1) # brief pause before next move
rate.sleep() # maintain logging frequency
self.save_to_csv()
rospy.loginfo(f"Data logging complete. Saved to {self.output_file}")
def save_to_csv(self):
with open(self.output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["time"] + self.joint_names)
writer.writerows(self.joint_data)
if __name__ == "__main__":
try:
app = UR10eMoverRecorder(duration=60.0, rate_hz=10, output_file="/tmp/ur10e_joint_log.csv")
app.move_and_record()
except rospy.ROSInterruptException:
pass