Skip to content

Async tool not awaited #9580

@michiel2002v

Description

@michiel2002v

Describe the bug
Hey! I'm fairly new to Haystack, but recently created a tool to explore a Unified NameSpace in MQTT. However, I constantly get an error that these async methods are not waited for.

Error message

RuntimeWarning: coroutine 'get_all_uns_topics' was never awaited
  handle = None  # Needed to break cycles when an exception occurs.
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Expected behavior
Normally, my method should return a list of all topics on the broker.

Additional context

@tool
async def get_all_uns_topics(
    uns_topic:str="v1/uns/#", 
    mqtt_host:str="emqx.com.demo.captureplatform.com", 
    mqtt_port:int=8883
    )  -> list[str]:
    """
    Query all available topics (structures) in the Unified Namespace.
    """
    topics = set()
    client_id = f"uns-client-{int(time.time())}"

    def on_connect(client, userdata, flags, rc, properties):
        if rc == 0:
            client.subscribe(uns_topic)
            print(f"Connected successfully, subscribing to {uns_topic} topics.")
        else:
            print(f"Connection failed with code {rc}")

    def on_message(client, userdata, msg):
        if msg.topic not in topics:
            print(f"Received message on topic: {msg.topic}")
            topics.add(msg.topic)

    print(f"Connecting to MQTT broker with params: {Secret.from_env_var("MQTT_USERNAME")} and {mqtt_host}")
    client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
    client.username_pw_set(Secret.from_env_var("MQTT_USERNAME"), Secret.from_env_var("MQTT_PASSWORD"))
    client.on_connect = on_connect
    client.on_message = on_message
    client.tls_set()
    client.connect(mqtt_host, mqtt_port, 60)
    loop = asyncio.get_running_loop()
    helper = AsyncioHelper(loop, client)

    await asyncio.sleep(5)

    client.disconnect()

    sorted_topics = sorted(topics)
    return sorted_topics
uns_explorer_agent = Agent(
    chat_generator=chat_generator,
    tools=[get_all_uns_topics, get_latest_uns_values],
    system_prompt="""You are an expert assistant trained to explore the Unified Namespace (UNS) of a production plant or company.
                Use the provided tools to query available topics and their latest values.
                Do not make assumptions.
                """
)

import asyncio

query = "What are the available topics in the UNS of Vintecc?"
messages = [ChatMessage.from_user(query)]

uns_explorer_agent.warm_up()

async def main():
    agent_output = await uns_explorer_agent.run_async(messages=messages)
    print(agent_output["messages"][-1].text)
    print("--- Document Agent Output ---")
    print(agent_output)

asyncio.run(main())

System:

  • OS: windows
  • GPU/CPU: cpu
  • Haystack version (commit or version number): latest
  • generator: OpenAIChatGenerator(model="gpt-4o-mini")

Metadata

Metadata

Assignees

No one assigned

    Labels

    P3Low priority, leave it in the backlog

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions