Version 20 (modified by 9 years ago) ( diff ) | ,
---|
Table of Contents
Tutorial
This is a draft version.
Vocabulary
Before we dive into this tutorial, lets define a few words. I tried to use KNX vocabulary, and hope I did understand them correclty. Feel free to contact me if you find mistakes.
KNX bases are needed.
- Group Address: multicast destination address of a bus datagram
- Individual Address: physical source address of a bus datagram
- Flags:
- Stack: code implementing KNX OSI layers
- Datapoint: internal variable of a Functional Block
- Group Object: multicast communication proxy of a Datapoint
- Functional Block: black box containing Datapoints and implementing features
- Device: set of Functional Block-s running as a process
- ETS: tool used to weave Device Functional Blocks Groups Objects through Group Address-es
Functional Block
This is the central feature of pKNyX. This is where we will implement needed logic.
A Function Block is implemented as a python class, and have Datapoint-s and Group Object-s attributes. The Datapoint-s reflect the state of the Functional Block; the Group Object-s expose the Datapoint-s to the KNX bus. When implementating a custom Functional Block, we will set/get its Datapoint-s. On the other hand, we won't alter its Group Object-s, which are only used internally for bus communication; we only control how they will be used through their Flags.
Usually, a Functional Block should remains simple; if complex features are needed, it is better to split them in several Function Block-s, weaved through Group Address-es. This is better for readability, reusability and allow better granularity during deployement.
Functional Block-s can be deployed as one or several Device-s. In pKNyX, each Device runs as a process. All Device-s can run on the same machine, or spread over a set of machines. Like in real KNX world.
Create device structure
As any good framework, pKNyX helps in starting writing code by auto-generating some usefull files, in a specific hierarchy, to implement a Device in a correct way.
The tool to do that is pknyx-admin.py
. This a global tool to manage devices (create/check/run). Let's see how to use it to create a fresh device:
$ pknyx-admin.py createdevice timer Generate 'timer' structure from template... 'timer' dir created 'timer/admin.py' file generated 'timer/timer' dir created 'timer/timer/__init__.py' file generated 'timer/timer/settings.py' file generated 'timer/timer/device.py' file generated 'timer/timer/timerFB.py' file generated 'timer' structure done
This will create a little arborescence under a dir named timer
:
$ tree timer timer ├── admin.py └── timer ├── device.py ├── __init__.py ├── settings.py └── timerFB.py 1 directory, 5 files
The top-level dir timer
can be renamed as you want. All further references are from this dir.
This dir contains a script called admin.py
; this script is in fact the pknyx-admin.py
one, with a pre-defined env var pointing on our timer structure, in order to manage it. It is possible to use the global pknyx-admin.py
script, but it would need to define $PKNX_DEVICE_PATH var and make it point to the second timer
dir, so that the python interpreter can correctly import our files. So, for now, let's use the admin.py
script.
The timer/settings.py
file contains a few pre-defined constants:
# -*- coding: utf-8 -*- from pknyx.common import config DEVICE_NAME = "timer" DEVICE_IND_ADDR = "1.1.1" DEVICE_VERSION = "0.1" config.LOGGER_LEVEL = "info"
This is where we will add new configs values for our Device, if needed.
The next file, timer/device.py
, contains a Device example:
# -*- coding: utf-8 -*- from pknyx.api import Device from timerFB import TimerFB class Timer(Device): FB_01 = dict(cls=TimerFB, name="timer_fb", desc="timer fb") LNK_01 = dict(fb="timer_fb", dp="dp_01", gad="1/1/1") DESC = "Timer" DEVICE = Timer
Last generated file is timer/timerFB.py
; it contains a Functional Block example:
# -*- coding: utf-8 -*- from pknyx.api import FunctionalBlock from pknyx.api import logger, schedule, notify class TimerFB(FunctionalBlock): DP_01 = dict(name="dp_01", access="output", dptId="1.001", default="Off") GO_01 = dict(dp="dp_01", flags="CWT", priority="low") DESC = "Timer FB"
Timer implementation
Ok, lets's start to implement our simple timer example. This timer monitors the state of a light, and switches it off automatically after a delay.
First, modify the timerFB.py
file according to this:
# -*- coding: utf-8 -*- from pknyx.api import FunctionalBlock from pknyx.api import logger, schedule, notify class TimerFB(FunctionalBlock): DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer FB" def init(self): self._timer = 0 @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): if event['newValue'] == "On": delay = self.dp["delay"].value logger.info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: logger.info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 @schedule.every(seconds=1) def updateTimer(self): if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off" @notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): if self._timer: delay = self.dp["delay"].value logger.info("%s: delay changed; restart timer" % self._name) self._timer = delay
Then, modify device.py
like this:
# -*- coding: utf-8 -*- from pknyx.api import Device from timerFB import TimerFB class Timer(Device): FB_01 = dict(cls=TimerFB, name="timer_fb", desc="timer fb") LNK_01 = dict(fb="timerfb", dp="cmd", gad="1/1/1") LNK_02 = dict(fb="timerfb", dp="state", gad="1/2/1") LNK_03 = dict(fb="timerfb", dp="delay", gad="1/3/1") DESC = "Timer" DEVICE = Timer
That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1
, 1/2/1
and 1/3/1
. All you have to do, is to weave the Group Object-s of your real devices to these Group Address-es, using the real ETS application. Sure, you maye have to change the Group Address-es to match your installation.
Ok, now, lets check our device, using the admin.py
script:
$ ./admin.py checkdevice Logger level is 'info' Config path is './timer' Device name is 'timer' Device Individual Address is '1.1.1' No error found $
Looks good. So, we can now really run the device:
$ ./admin.py rundevice Logger level is 'info' Config path is './timer' Device name is 'timer' Device Individual Address is '1.1.1' Detaching is 'False' Scheduler started Stack running
Here, the call is blocking, on the Stack mainloop, waiting for incoming telegrams from the bus. Here is what you can see if you switch on the light:
timerfb: start timer for 10s timerfb: timer expired; switch off
And the light should have been switched off after 10s.
To exist from the device, just use Ctrl-C.
Important note: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtual devices to communicate, you must ensure that they don't use the same source address, like in a real installation. Keep this in mind ; if you don't see telegrams you are expecting, the problem may be there.
So, lets deep inside this example to explain how things work, starting with the timer/timer/gb/timerFB.py
module.
First, we import some python objects:
from pknyx.api import FunctionalBlock from pknyx.api import logger, schedule, notify
FunctionalBlock
is a class; logger
, schedule
, notify
are instances.
The logger
object is a global logger service, which allows you to output all informations you want, at different levels. pKNyX makes a big usage of that logger. By default, the template set the logger level to info
, but you can set it to other levels. See the documentation.
schedule
is a helper implementating a powerfull scheduler, based on APScheduler (see below).
notify
works the same way schedule
does, but provides bus notifications rather than time notifications (see below).
Now, it is time to write the central part of our device: a custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:
class TimerFB(FunctionalBlock): DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer FB" def init(self): self._timer = 0
The DP_
class attributes are the Datapoint-s of our Functional Block.
The GO_
class attributes are the Group Object-s mapping the Datapoint-s to the bus through multicast service (Group Address).
They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as it starts with DP_
for Datapoints, and GO_
for Group Objects.
There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.
The init()
method is called at the end of the Functional Block instanciation (creation); it can be used to init some additionnal global vars, or make initial tasks. Here, we just create the _timer
var, and set it to 0.
Ok, it's time to dig in the active part of our functional block:
@notify.datapoint(dp="state", condition="change") def stateChanged(self, event): if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0
The role of the stateChanged()
method is to start/stop the timer depending of the state.
This method as a decorator, @notify.datapoint()
. pKNyX does not use decorators as they usually are made for; here, the decorator registers stateChanged()
in notify
, which will call this method as soon as the state
Datapoint changes because of a bus activity.
Note how we retreive Datapoint value: our Functional Block has a dp
dictionnary, which contains all our Datapoint-s; the keys are the names we used to describe them in the DP_
dictionnary. To get/set the value of the Datapoint, just use the .value
property.
Let's now have a look at the timer treatement:
@schedule.every(seconds=1) def updateTimer(self): if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off"
Note how this method is periodically called, using the schedule.every()
decorator, as for notify
. This decorator will automatically register the method, and schedule
will call it every second.
There, we just decrement the timer (if not null), and check if the delay expired. If it is the case, we change the cmd
Datapoint value by assigning a new value to the .value
property. Doing this, pKNyX will automatically notify the bus of this change, according to the flags of the corresponding GO_
object! If things do not work as expected, check that you didn't omit the .value
property; if it is the case, you just overwrited the Datapoint itself with the value you want to assign to it!
The last method manages the delay
Datapoint:
@notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay
I think you got the point ;o)
Now we need to implement our Device, in timer/timer/device.py
:
# -*- coding: utf-8 -*- from pknyx.api import Device from timerFB import TimerFB
A few imports; most important one is, of course, our custom Functional Block.
Then, we implement our Device:
class Timer(Device): FB_01 = dict(cls=TimerFB, name="timer_fb", desc="timer fb") LNK_01 = dict(fb="timer_fb", dp="cmd", gad="1/1/1") LNK_02 = dict(fb="timer_fb", dp="state", gad="1/2/1") LNK_03 = dict(fb="timer_fb", dp="delay", gad="1/3/1") DESC = "Timer"
As you see, it inherits the Device
class. Like for the Function Block, things are defined through class attributes; pKNyX, like python, tries to limit the number of paradigms.
We first need to tell which Function Block(-s) our device will use. This is done by creating a dict, which name must start with FB_
. Here, we only have one Functional Block.
Then, we weave our Datapoint-s (our Group Object-s, in fact) to our group addresses. Again, a simple dict is used, with names satring with LNK_
.
The last thing we need to write is to tell pKNyX what is our Device implementation class:
DEVICE = Timer
Complex example
Have a look at pknyx/examples/3_weather/ to see how to use several Functional Block-s within the same Device.
Conclusion
That's it for now with this tutorial. This is the first working dev. release; their are many additional things to do, and final implementation may change, according to feedback/suggestions I will get. But the core is there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions. Unlink linknx, you have a little more to do that simply write a configuration file. But you will find that it is not that complicated (pKNyX handles all nasty stuff for you), and may even simplify things for complex tasks (linknx rules are not that simple in this case).