Skip to content
minserver.tmpl 2.51 KiB
Newer Older
import uuid
from threading import Thread
import copy
import logging
from datetime import datetime
import time
from math import sin
import sys
import random

from opcua.ua import NodeId, NodeIdType

sys.path.insert(0, "..")

try:
    from IPython import embed
except ImportError:
    import code

    def embed():
        myvars = globals()
        myvars.update(locals())
        shell = code.InteractiveConsole(myvars)
        shell.interact()

from opcua import ua, uamethod, Server


class SubHandler(object):
    """
    Subscription Handler. To receive events from server for a subscription
    """

    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)

    def event_notification(self, event):
        print("Python: New event", event)

class VarUpdater(Thread):
    def __init__(self, var):
        Thread.__init__(self)
        self._stopev = False
        self.var = var

    def stop(self):
        self._stopev = True
    def run(self):
        while not self._stopev:
            for v in self.var:
                ty=v.get_data_type_as_variant_type()
                if ty!=ua.VariantType.String and ty!=ua.VariantType.Boolean: 
                     if v.get_value()!=None:
                        v.set_value(v.get_value()+random.uniform(-5,5))
                     else:
                        v.set_value(random.uniform(-5,5))
            time.sleep(2)



if __name__ == "__main__":
    # optional: setup logging
    logging.basicConfig(level=logging.ERROR)

    server = Server()
    #server.disable_clock()
    #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
    server.set_endpoint("$Url")
    server.set_server_name("$Assembly Simulation Server")
    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
                ua.SecurityPolicyType.NoSecurity,
                ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
                ua.SecurityPolicyType.Basic256Sha256_Sign])

    uri = "http://localhost/$Assembly/"
    idx = server.register_namespace(uri)
    # import some nodes from xml
    server.import_xml("$Model")

    root = server.get_root_node()
    obj=root.get_children()[0].get_children()
    get=obj[2].get_children()

    # starting Server
    server.start()
    vup = VarUpdater(get)  # update a monitoring variables only Numeric variables
    vup.start() 
    print("Server Started!")
    try:
        embed()
    finally:
        vup.stop()
        server.stop()
        print("Server Stopped!")