Version 3 (modified by 11 years ago) ( diff ) | ,
---|
Table of Contents
Tutorial
Architecture
Vocabulary
TODO
Functional Blocks
This is the central feature of pKNyX, allowing user to create virtual devices which mimics real KNX devices. The Device itself is implemented as a process.
Here is a very simple example which implements a timer. This timer can monitor the state of a light, and switch it off automatically after a delay. The code is taken from the examples/3_timer
directory of pKNyX sources.
from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier NAME = "timer" IND_ADDR = "1.2.3" LEVEL = "info" # ETS group address map GAD_MAP = {"1": dict(name="lights", desc="Lights"), "1/1": dict(name="lights_cmds", desc="Lights commands"), "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"), "1/2": dict(name="states", desc="Lights states"), "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"), "1/3": dict(name="lights_delays", desc="Lights delays"), "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"), } logger = Logger("%s-%s" % (NAME, IND_ADDR)) logger.setLevel(LEVEL) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier() class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0 @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 @schedule.every(seconds=1) def updateTimer(self): """ Method called every second. """ if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off" @notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): """ Method called when the 'delay' datapoint changes """ logger.debug("TimerFB.delayChanged(): event=%s" % repr(event)) # If the timer is running, we reset it to the new delay if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay def main(): # Register functional block ets.register(TimerFB, name="timer", desc="") # Weave datapoints ets.weave(fb="timer", dp="cmd", gad="1/1/1") ets.weave(fb="timer", dp="state", gad="1/2/1") ets.weave(fb="timer", dp="delay", gad="1/3/1") print ets.printGroat("gad") print ets.printGroat("go") print schedule.printJobs() print # Run the stack main loop (blocking call) stack.mainLoop() if __name__ == "__main__": try: main() except: logger.exception("3_main")
That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1
, 1/3/1
and 1/3/1
. All you have to do, is to weave (link, bind...) the Group Objects of your real devices to these Groups Addresses, using the real ETS application. Sure, you maye have to change the GAD to match your installation.
Ok, now, lets start this device (invoke the python interpreter as for any other python script):
MainThread::Logger_.__init__(): start new logger 'timer-1.2.3' MainThread::Scheduler started GAD Datapoint Functional block DPTID Flags Priority ----------------------------------------------------------------------------------------------------------------------------- 1 Lights ├── 1 Lights commands │ ├── 1 Light 'test' (cmd) cmd timer 1.001 CWT low ├── 2 Lights States │ ├── 1 Ligh 'test' (state) state timer 1.001 CWUI low ├── 3 Lights delays │ ├── 1 Light 'test' (delay) delay timer 7.005 CWU low Functional block Datapoint DPTID GAD Flags Priority ------------------------------------------------------------------------------------------------------------------------ timer delay 7.005 1/3/1 CWU low timer state 1.001 1/2/1 CWUI low timer cmd 1.001 1/1/1 CWT low Jobstore default: TimerFB.updateTimer (trigger: interval[0:00:01], next run at: 2013-08-14 20:21:46.943959) MainThread::Stack running
Here, your device waits for bus events!
Lets deep inside this example. First, we import some python objects:
from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier
These objects are all classes.
Let's define a few constants:
NAME = "timer" IND_ADDR = "1.2.3" LEVEL = "info" # ETS group address map GAD_MAP = {"1": dict(name="lights", desc="Lights"), "1/1": dict(name="lights_cmds", desc="Lights commands"), "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"), "1/2": dict(name="states", desc="States"), "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"), "1/3": dict(name="lights_delays", desc="Lights delays"), "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"), }
We then instanciante some high level objects and helpers:
logger = Logger("%s-%s" % (NAME, IND_ADDR)) logger.setLevel(LEVEL) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier()
The Logger
object is a global logger service, which allows you to output all informations you want, at different levels. pKNyX makes a big usage of that logger. Here, we set the info
level to avoid too much outputs (default level is trace
, which logs everything).
The Stack
object is used to communicate over the bus (real bus, of course, but also virtual bus). We can give it an Individual Address, to mimic real devices behaviour. This address will be used as Source Address when communicating over the bus. The address 0/0/0
is used by default.
Important: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtal devices to communicate, you must ensure that they don't use the same source address, like in a real installation. Keep this in mind ; if you don't see telegrams you are expecting, the problem may be there.
The ETS
object is a tool which works more or less like the real ETS application (see below). It needs one mandatory parameter: the Stack
object. The gadMap
param is optionnal, and is a simple dictionnary which maps group addresses to names/description. pKNyX does not need this to work, but it can be easier to debug things. Note that it is not really used now, but it will be in the future.
The Scheduler
is a helper implementating a powerfull scheduler, based on APScheduler (see below).
The Notifier
works the same way the Scheduler
does, but provides bus notifications rather than time notifications (see below).
Now, it is time to write the central part of our device: a custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:
class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0
The DP_xx
class attributes are the Datapoints of our Functional Block. Datapoints are the internal vars of the functional bloc which can be exposed to the bus.
The GO_xx
class attributes are the Group Objects mapping the Datapoints to the bus through multicast service (Group Address).
They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as it starts with DP_
for Datapoints, and GO_
for Group Objects.
There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.
The _init()
method is called when the functional block is instanciated; it can be used to init some additionnal global vars, or make initial tasks. Here, we just create the _timer
var, and set it to 0.
Ok, it's time to dig in the active part of our functional block. Let's start with the stateChanged()
method:
@notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0
The role of this method is to start/stop the timer depending of the state.
This method as a decorator. pKNyX does not use decorators as they usually are made for; here, the decorator registers the decorated method in the Notifier
, which will call this method as soon as the state Datapoint changes because of a bus activity.
Note how we retreive Datapoint value: our functional block has a dp
dictionnary, which contains all our datapoints; the keys are the names we used to describe them in the DP_xxx
dictionnary. To get the value of the Datapoint, just use the .value
property.
Lets have a look at the timer treatement:
@schedule.every(seconds=1) def updateTimer(self): """ Method called every second. """ if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off"
Note how this method is periodically called, using the schedule.every()
method as python decorator, as for the Notifier
. This decorator will automatically register our method and call it every second.
There, we just decrement the timer (if not null), and check if the delay expired. If it is the case, we change the cmd Datapoint value by assigning a new value to the .value
property. Doing this, pKNyX will automatically notify the bus of this change, according to the flags of the corresponding GO_xxx
object! If things do not work as expected, check that you didn't omit the .value
property; if it is the caase, you will simply overwrite the Datapoint itself with the value you want to assign to it!
The last method just manages the delay Datapoint:
@notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): """ Method called when the 'delay' datapoint changes """ logger.debug("TimerFB.delayChanged(): event=%s" % repr(event)) # If the timer is running, we reset it to the new delay if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay
I think you got the point ;o)
Ok, we need to write a few more things to get our device working. First, we need to register the functional bloc:
def main(): # Register functional block ets.register(TimerFB, name="timer", desc="")
and use the ETS
object to weave (bind, link...) our Datapoints (their matching Group Objects, in fact) to our group addresses:
# Weave datapoints ets.weave(fb="timer", dp="cmd", gad="1/1/1") ets.weave(fb="timer", dp="state", gad="1/2/1") ets.weave(fb="timer", dp="delay", gad="1/3/1")
We can print a summary of our mapping:
print ets.printGroat("gad") print ets.printGroat("go") print schedule.printJobs() print
And finally, launch the framework main loop:
# Run the stack main loop (blocking call) stack.mainLoop()
We now have a process (Device) running, listening to the bus. The Datapoints, through their respective Group Objects, will react to requests on the group addresses they are weaved to.
That's it for now. This is the first working dev. release; their are many additional things to do, and final implementation may change, according to feedback/suggestions I will get. But the core is there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions. Unlink linknx, you have a little more to do that simply write a configuration file. But you will find that it is not that complicated (pKNyX handles all nasty stuff for you), and may even simplify things for complex tasks (linknx rules are not that simple).