rasolar/sds.py

166 lines
4.9 KiB
Python
Raw Normal View History

2019-06-10 01:19:22 -07:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# SDS011_Feinstaub_Sensor.py
#
# Copyright 2017 by luetzel <webmaster_at_raspberryblog.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
from __future__ import print_function
import serial, struct, sys, time
import socket,json,requests
ser = serial.Serial()
#ser.port = sys.argv[1]
ser.port = "/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0"
#ser.port = "/dev/ttyUSB5"
ser.baudrate = 9600
sensebox_id="5cf8a49707460b001b43a245"
sensebox_25="5cf8a49707460b001b43a247"
sensebox_10="5cfc0d44a1ba9f001a6e57ae"
url="https://ingress.opensensemap.org/boxes/%s/%s"
pm=[0,0]
pm_count=0
ser.open()
ser.flushInput()
def dump_data(d):
print(' '.join(x.encode('hex') for x in d))
def process_frame(d):
#dump_data(d) #debug
r = struct.unpack('<HHxxBBB', d[2:])
pm25 = r[0]/10.0
pm10 = r[1]/10.0
print(pm25)
checksum = sum(ord(v) for v in d[2:8])%256
out=(0,0)
if (checksum==r[2] and r[3]==0xab):
out=(pm25,pm10)
return out
def export_data(pm25,pm10):
print("PM 2.5: {} μg/m^3 PM 10: {} μg/m^3".format(pm25, pm10))
r = requests.post(url % (sensebox_id,sensebox_25),json={'value': pm25})
if r.status_code != requests.codes.ok:
print("Error %d: %s" % (r.status_code,r.text))
r = requests.post(url % (sensebox_id,sensebox_10),json={'value': pm10})
if r.status_code != requests.codes.ok:
print("Error %d: %s" % (r.status_code,r.text))
json_out={"time": int(time.time()*1000),"device": "ragps","payload":{"pm25": int(pm25*1000),"pm10": int(pm10*1000)}}
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except:
print("{}: could not connect to database".format(time.time()))
else:
s.connect(("banana", 24048))
s.sendall(json.dumps(json_out))
s.close()
def sensor_read():
byte = 0
while byte != "\xaa":
byte = ser.read(size=1)
d = ser.read(size=10)
out=(0,0)
if d[0] == "\xc0":
out=process_frame(byte + d)
return out
# 0xAA, 0xB4, 0x06, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x06, 0xAB
def sensor_wake():
bytes = ['\xaa', #head
'\xb4', #command 1
'\x06', #data byte 1
'\x01', #data byte 2 (set mode)
'\x01', #data byte 3 (sleep)
'\x00', #data byte 4
'\x00', #data byte 5
'\x00', #data byte 6
'\x00', #data byte 7
'\x00', #data byte 8
'\x00', #data byte 9
'\x00', #data byte 10
'\x00', #data byte 11
'\x00', #data byte 12
'\x00', #data byte 13
'\xff', #data byte 14 (device id byte 1)
'\xff', #data byte 15 (device id byte 2)
'\x05', #checksum
'\xab'] #tail
for b in bytes:
ser.write(b)
# xAA, 0xB4, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x05, 0xAB
def sensor_sleep():
bytes = ['\xaa', #head
'\xb4', #command 1
'\x06', #data byte 1
'\x01', #data byte 2 (set mode)
'\x00', #data byte 3 (sleep)
'\x00', #data byte 4
'\x00', #data byte 5
'\x00', #data byte 6
'\x00', #data byte 7
'\x00', #data byte 8
'\x00', #data byte 9
'\x00', #data byte 10
'\x00', #data byte 11
'\x00', #data byte 12
'\x00', #data byte 13
'\xff', #data byte 14 (device id byte 1)
'\xff', #data byte 15 (device id byte 2)
'\x05', #checksum
'\xab'] #tail
for b in bytes:
ser.write(b)
def main(args):
a = 2
while a > 1:
sensor_wake()
time.sleep(30)
ser.flushInput()
pm25=0
pm10=0
pm_count=0
for n in range(5):
pm_data=sensor_read()
if(pm_data[0]>0):
pm25=pm25+pm_data[0]
pm10=pm10+pm_data[1]
pm_count=pm_count+1
time.sleep(2)
export_data(round(pm25/pm_count,1),round(pm10/pm_count,1))
# sensor_sleep()
time.sleep(600)
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv))