[[PageOutline(2-5, Table of Contents, floated)]] = Tutorial = == Architecture == == Vocabulary == * Device * Functional Block * Datapoint * Group Object * Flags * ETS * Stack * Group Address == Functional Blocks == This is the central feature of '''pKNyX''', allowing user to create virtual devices which mimics real KNX devices. The Device itself is implemented as a process. Here is a very simple example which implements a timer. This timer can monitor the state of a light, and switch it off automatically after a delay. The code is taken from the {{{examples/3_timer}}} directory of ''pKNyX''' sources. {{{ #!python from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier NAME = "timer" IND_ADDR = "1.2.3" LEVEL = "info" # ETS group address map GAD_MAP = {"1": dict(name="lights", desc="Lights"), "1/1": dict(name="lights_cmds", desc="Lights commands"), "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"), "1/2": dict(name="states", desc="Lights states"), "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"), "1/3": dict(name="lights_delays", desc="Lights delays"), "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"), } logger = Logger("%s-%s" % (NAME, IND_ADDR)) logger.setLevel(LEVEL) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier() class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0 @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 @schedule.every(seconds=1) def updateTimer(self): """ Method called every second. """ if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off" @notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): """ Method called when the 'delay' datapoint changes """ logger.debug("TimerFB.delayChanged(): event=%s" % repr(event)) # If the timer is running, we reset it to the new delay if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay def main(): # Register functional block ets.register(TimerFB, name="timer", desc="") # Weave datapoints ets.weave(fb="timer", dp="cmd", gad="1/1/1") ets.weave(fb="timer", dp="state", gad="1/2/1") ets.weave(fb="timer", dp="delay", gad="1/3/1") print ets.printGroat("gad") print ets.printGroat("go") print schedule.printJobs() print # Run the stack main loop (blocking call) stack.mainLoop() if __name__ == "__main__": try: main() except: logger.exception("3_main") }}} That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses {{{1/1/1}}}, {{{1/3/1}}} and {{{1/3/1}}}. All you have to do, is to weave (link, bind...) the Group Objects of your real devices to these Groups Addresses, using the real '''ETS''' application. Sure, you maye have to change the GAD to match your installation. Ok, now, lets start this device (invoke the python interpreter as for any other python script): {{{ MainThread::Logger_.__init__(): start new logger 'timer-1.2.3' MainThread::Scheduler started GAD Datapoint Functional block DPTID Flags Priority ----------------------------------------------------------------------------------------------------------------------------- 1 Lights ├── 1 Lights commands │ ├── 1 Light 'test' (cmd) cmd timer 1.001 CWT low ├── 2 Lights States │ ├── 1 Ligh 'test' (state) state timer 1.001 CWUI low ├── 3 Lights delays │ ├── 1 Light 'test' (delay) delay timer 7.005 CWU low Functional block Datapoint DPTID GAD Flags Priority ------------------------------------------------------------------------------------------------------------------------ timer delay 7.005 1/3/1 CWU low timer state 1.001 1/2/1 CWUI low timer cmd 1.001 1/1/1 CWT low Jobstore default: TimerFB.updateTimer (trigger: interval[0:00:01], next run at: 2013-08-14 20:21:46.943959) MainThread::Stack running }}} Here, your device waits for bus events! Lets deep inside this example. First, we import some python objects: {{{ #!python from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier }}} These objects are all classes. Let's define a few constants: {{{ #!python NAME = "timer" IND_ADDR = "1.2.3" LEVEL = "info" # ETS group address map GAD_MAP = {"1": dict(name="lights", desc="Lights"), "1/1": dict(name="lights_cmds", desc="Lights commands"), "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"), "1/2": dict(name="states", desc="States"), "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"), "1/3": dict(name="lights_delays", desc="Lights delays"), "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"), } }}} We then instanciante some high level objects and helpers: {{{ #!python logger = Logger("%s-%s" % (NAME, IND_ADDR)) logger.setLevel(LEVEL) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier() }}} The {{{Logger}}} object is a global logger service, which allows you to output all informations you want, at different levels. '''pKNyX''' makes a big usage of that logger. Here, we set the {{{info}}} level to avoid too much outputs (default level is {{{trace}}}, which logs everything). The {{{Stack}}} object is used to communicate over the bus (real bus, of course, but also virtual bus). We can give it an Individual Address, to mimic real devices behaviour. This address will be used as Source Address when communicating over the bus. The address {{{0/0/0}}} is used by default. '''Important: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtal devices to communicate, you must ensure that they don't use the same source address, like in a real installation. Keep this in mind ; if you don't see telegrams you are expecting, the problem may be there.''' The {{{ETS}}} object is a tool which works more or less like the real ETS application (see below). It needs one mandatory parameter: the {{{Stack}}} object. The {{{gadMap}}} param is optionnal, and is a simple dictionnary which maps group addresses to names/description. '''pKNyX''' does not need this to work, but it can be easier to debug things. Note that it is not really used now, but it will be in the future. The {{{Scheduler}}} is a helper implementating a powerfull scheduler, based on [http://pythonhosted.org/APScheduler APScheduler] (see below). The {{{Notifier}}} works the same way the {{{Scheduler}}} does, but provides bus notifications rather than time notifications (see below). Now, it is time to write the central part of our device: a custom Functional Block; this is done by subclassing the '''!FunctionBlock''' base class, and adding a few attributes/methods: {{{ #!python class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0 }}} The {{{DP_xx}}} class attributes are the '''Datapoints''' of our Functional Block. Datapoints are the internal vars of the functional bloc which can be exposed to the bus. The {{{GO_xx}}} class attributes are the '''Group Objects''' mapping the Datapoints to the bus through multicast service (Group Address). They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as it starts with {{{DP_}}} for Datapoints, and {{{GO_}}} for Group Objects. There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial. The {{{_init()}}} method is called when the functional block is instanciated; it can be used to init some additionnal global vars, or make initial tasks. Here, we just create the {{{_timer}}} var, and set it to 0. Ok, it's time to dig in the active part of our functional block. Let's start with the {{{stateChanged()}}} method: {{{ #!python @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 }}} The role of this method is to start/stop the timer depending of the state. This method as a '''decorator'''. '''pKNyX''' does not use decorators as they usually are made for; here, the decorator registers the decorated method in the {{{Notifier}}}, which will call this method as soon as the ''state'' Datapoint changes because of a bus activity. Note how we retreive Datapoint value: our functional block has a {{{dp}}} dictionnary, which contains all our datapoints; the keys are the names we used to describe them in the {{{DP_xxx}}} dictionnary. To get the value of the Datapoint, just use the {{{.value}}} property. Lets have a look at the timer treatement: {{{ #!python @schedule.every(seconds=1) def updateTimer(self): """ Method called every second. """ if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off" }}} Note how this method is periodically called, using the {{{schedule.every()}}} method as python decorator, as for the {{{Notifier}}}. This decorator will automatically register our method and call it every second. There, we just decrement the timer (if not null), and check if the delay expired. If it is the case, we change the ''cmd'' Datapoint value by assigning a new value to the {{{.value}}} property. Doing this, '''pKNyX''' will automatically notify the bus of this change, according to the flags of the corresponding {{{GO_xxx}}} object! If things do not work as expected, check that you didn't omit the {{{.value}}} property; if it is the caase, you will simply overwrite the Datapoint itself with the value you want to assign to it! The last method just manages the ''delay'' Datapoint: {{{ #!python @notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): """ Method called when the 'delay' datapoint changes """ logger.debug("TimerFB.delayChanged(): event=%s" % repr(event)) # If the timer is running, we reset it to the new delay if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay }}} I think you got the point ;o) Ok, we need to write a few more things to get our device working. First, we need to register the functional bloc: {{{ #!python def main(): # Register functional block ets.register(TimerFB, name="timer", desc="") }}} and use the {{{ETS}}} object to weave (bind, link...) our Datapoints (their matching Group Objects, in fact) to our group addresses: {{{ #!python # Weave datapoints ets.weave(fb="timer", dp="cmd", gad="1/1/1") ets.weave(fb="timer", dp="state", gad="1/2/1") ets.weave(fb="timer", dp="delay", gad="1/3/1") }}} We can print a summary of our mapping: {{{ #!python print ets.printGroat("gad") print ets.printGroat("go") print schedule.printJobs() print }}} And finally, launch the framework main loop: {{{ #!python # Run the stack main loop (blocking call) stack.mainLoop() }}} We now have a process (Device) running, listening to the bus. The Datapoints, through their respective Group Objects, will react to requests on the group addresses they are weaved to. That's it for now. This is the first working dev. release; their are many additional things to do, and final implementation may change, according to feedback/suggestions I will get. But the core is there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions. Unlink '''linknx''', you have a little more to do that simply write a configuration file. But you will find that it is not that complicated ('''pKNyX''' handles all nasty stuff for you), and may even simplify things for complex tasks (linknx rules are not that simple).