Newer
Older
import uuid
from threading import Thread
import copy
import logging
from datetime import datetime
import time
from math import sin
import sys
import random
from opcua.ua import NodeId, NodeIdType
sys.path.insert(0, "..")
try:
from IPython import embed
except ImportError:
import code
def embed():
myvars = globals()
myvars.update(locals())
shell = code.InteractiveConsole(myvars)
shell.interact()
from opcua import ua, uamethod, Server
Gino Tosti
committed
def checkData(var):
for v in var:
ty=v.get_data_type_as_variant_type()
if ty==ua.VariantType.Boolean:
if v.get_value()==None:
v.set_value(False, ua.VariantType.Boolean)
if ty==ua.VariantType.String:
if v.get_value()==None:
v.set_value("String-Data", ua.VariantType.String)
if ty==ua.VariantType.Int16:
if v.get_value()==None:
v.set_value(0, ua.VariantType.Int16)
if ty==ua.VariantType.Int32:
if v.get_value()==None:
v.set_value(0, ua.VariantType.Int32)
if ty==ua.VariantType.UInt16:
if v.get_value()==None:
v.set_value(0, ua.VariantType.UInt16)
if ty==ua.VariantType.UInt32:
if v.get_value()==None:
v.set_value(0, ua.VariantType.UInt32)
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event)
class VarUpdater(Thread):
def __init__(self, var):
Thread.__init__(self)
self._stopev = False
self.var = var
def stop(self):
self._stopev = True
def run(self):
while not self._stopev:
for v in self.var:
ty=v.get_data_type_as_variant_type()
Gino Tosti
committed
if ty==ua.VariantType.Double:
if v.get_value()!=None:
v.set_value(v.get_value()+random.uniform(-5,5))
else:
v.set_value(random.uniform(-5,5))
if ty==ua.VariantType.Int16:
v.set_value(random.randint(0,8),ua.VariantType.Int16)
if ty==ua.VariantType.Int32:
v.set_value(random.randint(0,8),ua.VariantType.Int32)
time.sleep(2)
if __name__ == "__main__":
# optional: setup logging
logging.basicConfig(level=logging.ERROR)
server = Server()
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server.set_endpoint("$Url")
server.set_server_name("$Assembly Simulation Server")
# set all possible endpoint policies for clients to connect through
server.set_security_policy([
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign])
uri = "http://localhost/$Assembly/"
idx = server.register_namespace(uri)
# import some nodes from xml
server.import_xml("$Model")
root = server.get_root_node()
obj=root.get_children()[0].get_children()[1].get_children()
get=obj[0].get_children()
Gino Tosti
committed
checkData(set1)
# starting Server
server.start()
vup = VarUpdater(get) # update a monitoring variables only Numeric variables
vup.start()
print("Server Started!")
try:
embed()
finally:
vup.stop()
server.stop()
print("Server Stopped!")