import uuid from threading import Thread import copy import logging from datetime import datetime import time from math import sin import sys import random from opcua.ua import NodeId, NodeIdType sys.path.insert(0, "..") try: from IPython import embed except ImportError: import code def embed(): myvars = globals() myvars.update(locals()) shell = code.InteractiveConsole(myvars) shell.interact() from opcua import ua, uamethod, Server class SubHandler(object): """ Subscription Handler. To receive events from server for a subscription """ def datachange_notification(self, node, val, data): print("Python: New data change event", node, val) def event_notification(self, event): print("Python: New event", event) class VarUpdater(Thread): def __init__(self, var): Thread.__init__(self) self._stopev = False self.var = var def stop(self): self._stopev = True def run(self): while not self._stopev: for v in self.var: ty=v.get_data_type_as_variant_type() if ty!=ua.VariantType.String and ty!=ua.VariantType.Boolean: if v.get_value()!=None: v.set_value(v.get_value()+random.uniform(-5,5)) else: v.set_value(random.uniform(-5,5)) time.sleep(2) if __name__ == "__main__": # optional: setup logging logging.basicConfig(level=logging.ERROR) server = Server() #server.disable_clock() #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/") server.set_endpoint("$Url") server.set_server_name("$Assembly Simulation Server") # set all possible endpoint policies for clients to connect through server.set_security_policy([ ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt, ua.SecurityPolicyType.Basic256Sha256_Sign]) uri = "http://localhost/$Assembly/" idx = server.register_namespace(uri) # import some nodes from xml server.import_xml("$Model") root = server.get_root_node() obj=root.get_children()[0].get_children() get=obj[2].get_children() # starting Server server.start() vup = VarUpdater(get) # update a monitoring variables only Numeric variables vup.start() print("Server Started!") try: embed() finally: vup.stop() server.stop() print("Server Stopped!")