wiki:Tutorial

Version 4 (modified by Frédéric, 11 years ago) ( diff )

--

Tutorial

Architecture

Vocabulary

  • Device
  • Functional Block
  • Datapoint
  • Group Object
  • Flags
  • ETS
  • Stack
  • Group Address

Functional Blocks

This is the central feature of pKNyX, allowing user to create virtual devices which mimics real KNX devices. The Device itself is implemented as a process.

Here is a very simple example which implements a timer. This timer can monitor the state of a light, and switch it off automatically after a delay. The code is taken from the examples/3_timer directory of pKNyX sources.

from pknyx.api import Logger
from pknyx.api import FunctionalBlock, Stack, ETS
from pknyx.api import Scheduler, Notifier

NAME = "timer"
IND_ADDR = "1.2.3"
LEVEL = "info"

# ETS group address map
GAD_MAP = {"1": dict(name="lights", desc="Lights"),
           "1/1": dict(name="lights_cmds", desc="Lights commands"),
           "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"),
           "1/2": dict(name="states", desc="Lights states"),
           "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"),
           "1/3": dict(name="lights_delays", desc="Lights delays"),
           "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"),
          }


logger = Logger("%s-%s" % (NAME, IND_ADDR))
logger.setLevel(LEVEL)

stack = Stack(individualAddress=IND_ADDR)
ets = ETS(stack=stack, gadMap=GAD_MAP))

schedule = Scheduler()
notify = Notifier()


class TimerFB(FunctionalBlock):
    """ Timer functional block
    """

    # Datapoints definition
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer"

    def _init(self):
        """ Additionnal init of the timer
        """
        self._timer = 0

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        """ Method called when the 'state' datapoint changes
        """
        logger.debug("TimerFB.stateChanged(): event=%s" % repr(event))

        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            Logger().info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                Logger().info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

    @schedule.every(seconds=1)
    def updateTimer(self):
        """ Method called every second.
        """
        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        """ Method called when the 'delay' datapoint changes
        """
        logger.debug("TimerFB.delayChanged(): event=%s" % repr(event))

        # If the timer is running, we reset it to the new delay
        if self._timer:
            delay = self.dp["delay"].value
            Logger().info("%s: delay changed; restart timer" % self._name)
            self._timer = delay


def main():

    # Register functional block
    ets.register(TimerFB, name="timer", desc="")

    # Weave datapoints
    ets.weave(fb="timer", dp="cmd", gad="1/1/1")
    ets.weave(fb="timer", dp="state", gad="1/2/1")
    ets.weave(fb="timer", dp="delay", gad="1/3/1")

    print
    ets.printGroat("gad")
    print
    ets.printGroat("go")
    print
    schedule.printJobs()
    print

    # Run the stack main loop (blocking call)
    stack.mainLoop()


if __name__ == "__main__":
    try:
        main()
    except:
        logger.exception("3_main")

That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1, 1/3/1 and 1/3/1. All you have to do, is to weave (link, bind...) the Group Objects of your real devices to these Groups Addresses, using the real ETS application. Sure, you maye have to change the GAD to match your installation.

Ok, now, lets start this device (invoke the python interpreter as for any other python script):

MainThread::Logger_.__init__(): start new logger 'timer-1.2.3'
MainThread::Scheduler started

GAD                                 Datapoint                 Functional block               DPTID      Flags      Priority  
-----------------------------------------------------------------------------------------------------------------------------
 1 Lights                           
 ├──  1 Lights commands            
 │    ├──   1 Light 'test' (cmd)    cmd                       timer                          1.001      CWT        low       
 ├──  2 Lights States                     
 │    ├──   1 Ligh 'test' (state)   state                     timer                          1.001      CWUI       low       
 ├──  3 Lights delays              
 │    ├──   1 Light 'test' (delay)  delay                     timer                          7.005      CWU        low       

Functional block               Datapoint                 DPTID      GAD                            Flags      Priority  
------------------------------------------------------------------------------------------------------------------------
timer                          delay                     7.005      1/3/1                          CWU        low       
timer                          state                     1.001      1/2/1                          CWUI       low       
timer                          cmd                       1.001      1/1/1                          CWT        low       

Jobstore default:
    TimerFB.updateTimer (trigger: interval[0:00:01], next run at: 2013-08-14 20:21:46.943959)

MainThread::Stack running

Here, your device waits for bus events!

Lets deep inside this example. First, we import some python objects:

from pknyx.api import Logger
from pknyx.api import FunctionalBlock, Stack, ETS
from pknyx.api import Scheduler, Notifier

These objects are all classes.

Let's define a few constants:

NAME = "timer"
IND_ADDR = "1.2.3"
LEVEL = "info"

# ETS group address map
GAD_MAP = {"1": dict(name="lights", desc="Lights"),
           "1/1": dict(name="lights_cmds", desc="Lights commands"),
           "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"),
           "1/2": dict(name="states", desc="States"),
           "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"),
           "1/3": dict(name="lights_delays", desc="Lights delays"),
           "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"),
          }

We then instanciante some high level objects and helpers:

logger = Logger("%s-%s" % (NAME, IND_ADDR))
logger.setLevel(LEVEL)

stack = Stack(individualAddress=IND_ADDR)
ets = ETS(stack=stack, gadMap=GAD_MAP))

schedule = Scheduler()
notify = Notifier()

The Logger object is a global logger service, which allows you to output all informations you want, at different levels. pKNyX makes a big usage of that logger. Here, we set the info level to avoid too much outputs (default level is trace, which logs everything).

The Stack object is used to communicate over the bus (real bus, of course, but also virtual bus). We can give it an Individual Address, to mimic real devices behaviour. This address will be used as Source Address when communicating over the bus. The address 0/0/0 is used by default.

Important: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtal devices to communicate, you must ensure that they don't use the same source address, like in a real installation. Keep this in mind ; if you don't see telegrams you are expecting, the problem may be there.

The ETS object is a tool which works more or less like the real ETS application (see below). It needs one mandatory parameter: the Stack object. The gadMap param is optionnal, and is a simple dictionnary which maps group addresses to names/description. pKNyX does not need this to work, but it can be easier to debug things. Note that it is not really used now, but it will be in the future.

The Scheduler is a helper implementating a powerfull scheduler, based on APScheduler (see below). The Notifier works the same way the Scheduler does, but provides bus notifications rather than time notifications (see below).

Now, it is time to write the central part of our device: a custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:

class TimerFB(FunctionalBlock):
    """ Timer functional block
    """

    # Datapoints definition
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer"

    def _init(self):
        """ Additionnal init of the timer
        """
        self._timer = 0

The DP_xx class attributes are the Datapoints of our Functional Block. Datapoints are the internal vars of the functional bloc which can be exposed to the bus.

The GO_xx class attributes are the Group Objects mapping the Datapoints to the bus through multicast service (Group Address).

They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as it starts with DP_ for Datapoints, and GO_ for Group Objects.

There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.

The _init() method is called when the functional block is instanciated; it can be used to init some additionnal global vars, or make initial tasks. Here, we just create the _timer var, and set it to 0.

Ok, it's time to dig in the active part of our functional block. Let's start with the stateChanged() method:

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        """ Method called when the 'state' datapoint changes
        """
        logger.debug("TimerFB.stateChanged(): event=%s" % repr(event))

        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            Logger().info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                Logger().info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

The role of this method is to start/stop the timer depending of the state.

This method as a decorator. pKNyX does not use decorators as they usually are made for; here, the decorator registers the decorated method in the Notifier, which will call this method as soon as the state Datapoint changes because of a bus activity.

Note how we retreive Datapoint value: our functional block has a dp dictionnary, which contains all our datapoints; the keys are the names we used to describe them in the DP_xxx dictionnary. To get the value of the Datapoint, just use the .value property.

Lets have a look at the timer treatement:

    @schedule.every(seconds=1)
    def updateTimer(self):
        """ Method called every second.
        """
        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

Note how this method is periodically called, using the schedule.every() method as python decorator, as for the Notifier. This decorator will automatically register our method and call it every second.

There, we just decrement the timer (if not null), and check if the delay expired. If it is the case, we change the cmd Datapoint value by assigning a new value to the .value property. Doing this, pKNyX will automatically notify the bus of this change, according to the flags of the corresponding GO_xxx object! If things do not work as expected, check that you didn't omit the .value property; if it is the caase, you will simply overwrite the Datapoint itself with the value you want to assign to it!

The last method just manages the delay Datapoint:

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        """ Method called when the 'delay' datapoint changes
        """
        logger.debug("TimerFB.delayChanged(): event=%s" % repr(event))

        # If the timer is running, we reset it to the new delay
        if self._timer:
            delay = self.dp["delay"].value
            Logger().info("%s: delay changed; restart timer" % self._name)
            self._timer = delay

I think you got the point ;o)

Ok, we need to write a few more things to get our device working. First, we need to register the functional bloc:

def main():

    # Register functional block
    ets.register(TimerFB, name="timer", desc="")

and use the ETS object to weave (bind, link...) our Datapoints (their matching Group Objects, in fact) to our group addresses:

    # Weave datapoints
    ets.weave(fb="timer", dp="cmd", gad="1/1/1")
    ets.weave(fb="timer", dp="state", gad="1/2/1")
    ets.weave(fb="timer", dp="delay", gad="1/3/1")

We can print a summary of our mapping:

    print
    ets.printGroat("gad")
    print
    ets.printGroat("go")
    print
    schedule.printJobs()
    print

And finally, launch the framework main loop:

    # Run the stack main loop (blocking call)
    stack.mainLoop()

We now have a process (Device) running, listening to the bus. The Datapoints, through their respective Group Objects, will react to requests on the group addresses they are weaved to.

That's it for now. This is the first working dev. release; their are many additional things to do, and final implementation may change, according to feedback/suggestions I will get. But the core is there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions. Unlink linknx, you have a little more to do that simply write a configuration file. But you will find that it is not that complicated (pKNyX handles all nasty stuff for you), and may even simplify things for complex tasks (linknx rules are not that simple).

Note: See TracWiki for help on using the wiki.