#!/usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "Sébastien Reuiller" # __licence__ = "Apache License 2.0" # Python 3, prérequis : pip install pySerial influxdb # TIC Enedis référence : Enedis-NOI-CPT_54E.pdf # https://www.enedis.fr/media/2035/download import logging import time from datetime import datetime import requests import serial from influxdb import InfluxDBClient import settings as cfg # clés à utiliser - les autres ne seront pas transmises USED_MESURE_KEYS = [ 'LTARF', 'EAST', 'EASF01', 'EASF02', 'IRMS1', 'URMS1', 'SINSTS', 'SMAXSN', 'CCASN', 'UMOY1', 'MSG1', 'NTARF', ] # clés téléinfo INT_MESURE_KEYS = [ 'EAST', 'EASF', 'EASD', 'EAIT', 'ERQ', 'IRMS', 'URMS', 'PREF', 'PCOUP', 'SINST', 'SMAX', 'CCA', 'UMOY', ] # création du logguer logging.basicConfig(filename=cfg.LOGFILE, level=logging.INFO, format='%(asctime)s %(message)s') logging.info("Teleinfo starting..") # connexion a la base de données InfluxDB if cfg.influxdb["PASSWORD"]: client = InfluxDBClient(host=cfg.influxdb["HOST"], port=cfg.influxdb["PORT"], username=cfg.influxdb["USER"], password=cfg.influxdb["PASSWORD"]) else: client = InfluxDBClient(host=cfg.influxdb["HOST"], port=cfg.influxdb["PORT"], username=cfg.influxdb["USER"]) connected = False while not connected: try: logging.info("Database %s exists?" % cfg.influxdb["DB_NAME"]) if not {'name': cfg.influxdb["DB_NAME"]} in client.get_list_database(): logging.info("Database %s creation.." % cfg.influxdb["DB_NAME"]) client.create_database(cfg.influxdb["DB_NAME"]) logging.info("Database %s created!" % cfg.influxdb["DB_NAME"]) client.switch_database(cfg.influxdb["DB_NAME"]) logging.info("Connected to %s!" % cfg.influxdb["DB_NAME"]) except requests.exceptions.ConnectionError: logging.info('InfluxDB is not reachable. Waiting 5 seconds to retry.') time.sleep(5) else: connected = True def add_measures(measures): points = [] for measure, values in measures.items(): point = { "measurement": measure, "tags": { # identification de la sonde et du compteur "host": "raspberry", "region": "linky" }, "time": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), "fields": values } points.append(point) client.write_points(points) def verif_checksum(data, checksum): data_unicode = 0 for caractere in data: data_unicode += ord(caractere) sum_unicode = (data_unicode & 63) + 32 return (checksum == chr(sum_unicode)) def checksum(line:str) -> str: return chr((sum(list(line)) & 0x3F) + 0x20) def main(): with serial.Serial(port=cfg.SERIAL, baudrate=9600, parity=serial.PARITY_EVEN, bytesize=serial.SEVENBITS, timeout=1) as ser: logging.info("Teleinfo is reading on %s.." % cfg.SERIAL) trame = dict() # boucle pour partir sur un début de trame line = ser.readline() while b'\x02' not in line: # recherche du caractère de début de trame line = ser.readline() delaycounter = cfg.SKIPPED_TRAMES while True: line = ser.readline() logging.debug(line) try: dataset = line.replace(b'\x03\x02', b'').rstrip(b'\r\n') checksumchar = checksum(dataset[:-1]) if checksumchar == chr(dataset[-1]): spline = dataset.split(b'\t') logging.debug(spline) etiquette = spline[0].decode('ascii') if etiquette in USED_MESURE_KEYS: value = spline[1].decode('ascii') timestamp = None if len(spline) == 4 and spline[2] != b'' : # Horodaté value = spline[2].decode('ascii') timestamp = spline[1].decode('ascii') for radix in INT_MESURE_KEYS: if etiquette.startswith(radix): value = int(value) trame[etiquette] = { "value": value, "timestamp": timestamp, } else: logging.debug('Checksum error, aborting frame') if b'\x03' in line: # si caractère de fin dans la ligne, on insère la trame dans influx if delaycounter >= cfg.SKIPPED_TRAMES: time_measure = time.time() # insertion dans influxdb add_measures(trame) # ajout timestamp pour debugger trame["timestamp"] = int(time_measure) logging.debug(trame) delaycounter = 0 else: delaycounter += 1 trame = dict() # on repart sur une nouvelle trame except Exception as e: logging.error("Exception : %s" % e, exc_info=True) logging.error("%s" % (line)) if __name__ == '__main__': if connected: main()