-
Notifications
You must be signed in to change notification settings - Fork 370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unable to get subscribe example working #1746
Comments
The server is just once sending a publish event:
|
Ok got it working. Two things
Here the complete example: # server.py
import asyncio
import logging
from asyncua import Server, Client, Node
from datetime import datetime
class OPCUAServer:
def __init__(self, endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/"):
self.endpoint = endpoint
self.server = Server()
self.trigger_var = None
self.part_id_var = None
self.namespace_idx = None
async def init(self):
await self.server.init()
self.server.set_endpoint(self.endpoint)
# Set server name
await self.server.set_application_uri("urn:example:opcua:server")
# # Create custom namespace
uri = "http://examples.freeopcua.github.io"
self.namespace_idx = await self.server.register_namespace(uri)
# # Load XML nodeset
await self.server.import_xml("UA_NodeSet.xml")
# await print_all_nodes(self.server)
process = await self.server.nodes.objects.get_child(f"{self.namespace_idx}:Process")
self.trigger_var = (await process.get_child([f"{self.namespace_idx}:Trigger"]))
self.part_id_var = (await process.get_child([f"{self.namespace_idx}:PartID"]))
# Set variables writable
await self.trigger_var.set_writable()
await self.part_id_var.set_writable()
print(f"Server namespace index: {self.namespace_idx}")
print(f"Trigger node id: {self.trigger_var.nodeid}")
print(f"PartID node id: {self.part_id_var.nodeid}")
async def start(self):
async with self.server:
while True:
# Simulate trigger and part ID updates
current_time = datetime.now().strftime("%H:%M:%S")
await self.trigger_var.write_value(True)
name = (await self.trigger_var.read_browse_name()).Name
value = (await self.trigger_var.read_value())
print(f"SERVER: {name} = {value}")
await self.part_id_var.write_value(f"PART_{current_time}")
name = (await self.part_id_var.read_browse_name()).Name
value = (await self.part_id_var.read_value())
print(f"SERVER: {name} = {value}")
# Wait for 5 seconds before next update
await asyncio.sleep(5)
await self.trigger_var.write_value(False)
name = (await self.trigger_var.read_browse_name()).Name
value = (await self.trigger_var.read_value())
print(f"SERVER: {name} = {value}")
await asyncio.sleep(5)
# client.py
class SubscriptionHandler:
async def datachange_notification(self, node: Node, val, data):
try:
print(f"New value for {node}: {val}")
except Exception as e:
print(f"Error in notification handler: {e}")
class OPCUAClient:
def __init__(self, url: str = "opc.tcp://localhost:4840/freeopcua/server/"):
self.url = url
self.client = Client(url=self.url)
async def subscribe_to_variables(self):
async with self.client:
try:
# Find the namespace index
uri = "http://examples.freeopcua.github.io"
nsidx = await self.client.get_namespace_index(uri)
print(f"Client namespace index: {nsidx}")
# Get the Process node first
objects = self.client.get_objects_node()
process_node = await objects.get_child(f"{nsidx}:Process")
# Get variables using their browse paths
trigger_node = await process_node.get_child(f"{nsidx}:Trigger")
part_id_node = await process_node.get_child(f"{nsidx}:PartID")
print(f"Found trigger node: {trigger_node.nodeid}")
print(f"Found part_id node: {part_id_node.nodeid}")
# Create subscription
handler = SubscriptionHandler()
subscription = await self.client.create_subscription(100, handler=handler)
await subscription.subscribe_data_change([trigger_node, part_id_node])
# Keep the client running
while True:
current_time = datetime.now().strftime("%H:%M:%S")
await part_id_node.write_value(f"PART_CLIENT_{current_time}")
name = (await part_id_node.read_browse_name()).Name
value = (await part_id_node.read_value())
print(f"CLIENT: {name} = {value}")
await asyncio.sleep(1)
except Exception as e:
print(f"Error in client: {e}")
raise
# Example XML configuration (UA_NodeSet.xml)
XML_CONTENT = """<?xml version="1.0" encoding="utf-8"?>
<UANodeSet xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:uax="http://opcfoundation.org/UA/2008/02/Types.xsd" xmlns="http://opcfoundation.org/UA/2011/03/UANodeSet.xsd">
<NamespaceUris>
<Uri>http://examples.freeopcua.github.io</Uri>
</NamespaceUris>
<UAObject NodeId="ns=1;i=1" BrowseName="1:Process">
<DisplayName>Process</DisplayName>
<References>
<Reference ReferenceType="HasComponent" IsForward="false">i=85</Reference>
</References>
</UAObject>
<UAVariable NodeId="ns=1;i=2" BrowseName="1:Trigger" DataType="Boolean">
<DisplayName>Trigger</DisplayName>
<References>
<Reference ReferenceType="HasComponent" IsForward="false">ns=1;i=1</Reference>
</References>
</UAVariable>
<UAVariable NodeId="ns=1;i=3" BrowseName="1:PartID" DataType="String">
<DisplayName>PartID</DisplayName>
<References>
<Reference ReferenceType="HasComponent" IsForward="false">ns=1;i=1</Reference>
</References>
</UAVariable>
</UANodeSet>
"""
# main.py
async def main():
# Save XML configuration
with open("UA_NodeSet.xml", "w") as f:
f.write(XML_CONTENT)
# Create and start server
server = OPCUAServer()
await server.init()
# Create and start client
client = OPCUAClient()
# Run both server and client concurrently
await asyncio.gather(server.start(), client.subscribe_to_variables())
if __name__ == "__main__":
# logging.basicConfig(level=logging.INFO)
asyncio.run(main()) Then the output looks like expected:
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Describe the bug
Unable to get client subscribe to work.
To Reproduce
See code.… [1]
Expected behavior
If Data is changed on the server the client should be notified via the subscription.
Version
Python-Version: 3.11.9
opcua-asyncio Version (e.g. master branch, 0.9): 1.1.5
[1]
The text was updated successfully, but these errors were encountered: