[Documentation] [TitleIndex] [WordIndex

Collecting useful helper functions here until they can find a proper home

   1 def is_publishing(node_name, topic_name):
   2    topics_published = [info(s).name for s in info(nodes[node_name]).subs] # <-- this one was a bit tricky to work out
   3    return True in [s.find(topic_name) >= 0 for s in topics_published]

credit: Patrick Bouffard

   1 def wait_for_topic(node_name, topic_name, sleep_time=0.5):
   2    loginfo("Waiting for topic %s from node %s" % (topic_name, node_name))
   3    while not is_publishing(node_name, topic_name):
   4        sleep(sleep_time)
   5    loginfo("Topic found!")

credit: Patrick Bouffard

To be implemented

   1 wait_for(lambda x: x.fieldname > 42, '/some/topic')
   2 do_cool_stuff()

2024-11-23 17:51