This code is from project: CC1101 Atmega32u USB dongle + python = RFkitten
rfkitten.py
#!/usr/bin/python
import serial
import threading
from time import sleep
from sys import exit,argv
from re import split, findall
from cc1101_config import *
import rf_analyzer
# define baud rate / needs to be global sice rf_analyzer.analyze_ook
# requires it
baud = 9600
# helper function to split data into smaller chunks, thanks to this
# you can grab longer packets before analyzing them
def split_by_zero(data):
# make binnary string e.g "1110001010101011011" out of data
bin_str = ''.join(format(ord(byte), '08b') for byte in data)
for splitted in split('0{20,}', bin_str):
if (len(splitted) > 0):
# make sure that we've got some zeroes at the end for regexp
splitted+="0" * 20
# analyze and print ook data
rf_analyzer.analyze_ook(splitted, baud, bin_input=True)
try:
# open serial port with max speed
s = serial.Serial(
port = argv[1],
baudrate = 4000000,
timeout = 0.01
)
# thread run flag
reader_run = 1
# define and start reading thread, you need to pass function that will
# fetch the data, here we have split_by_zero which is defined above
# with smaller fixed-packet lengths, you could directly pass rf_analyzer.analyze_ook here
t = threading.Thread(target=reader, args=(s, lambda: reader_run, split_by_zero))
#t = threading.Thread(target=reader, args=(s, lambda: reader_run, rf_analyzer.analyze_ook))
t.start()
# create modem configuration
cfg_final = join_config(
get_cfg_init(),
modulation("ASK/OOK"),
manchaster(0),
base_frequency(433.92),
sensivity("27 dB"),
channel_bandwidth(100),
data_rate(baud),
packet_len(128),
)
# send config to cc1101
push_config(s, cfg_final)
# here you can test transmit mode
if(0):
doorbell = "110001011111110101010010"
code = ""
# expand 1:1 for 2.4kbps
for c in doorbell:
if c == '1':
code = code + "1110"
else:
code = code + "1000"
# split into array
arr = findall('.{1,8}', code)
# determine packet length + additional prefix and suffix bytes
push_config(s, packet_len(len(arr) + 2))
# send key two times
for e in xrange(0, 2):
# prefix
s.write(chr(0xFF))
for z in arr:
byte = int(z,2)
# invert data
byte_inv = ~byte & 0xFF
s.write(chr(byte_inv))
# suffix
s.write(chr(0xFF))
while reader_run:
sleep(1)
except KeyboardInterrupt:
print "user interrupt"
reader_run = 0
except RuntimeError:
print "bad config"
reader_run = 0
finally:
t.join();
s.close()
rf_analyzer.py
from re import compile,sub,subn
# regexp for data rates
zero={}
zero[1] = compile("1{3}0")
zero[2] = compile("1{6}0")
zero[4] = compile("1{12}0")
zero[8] = compile("1{24}0")
one={}
one[1] = compile("10{3,}")
one[2] = compile("10{6,}")
one[4] = compile("10{12,}")
one[8] = compile("10{24,}")
# list of know keys
know_keys = [
('doorbell_button_1_first', "10001011111110100000010"),
('doorbell_button_1', "110001011111110101010010"),
('doorbell_button_2_first', "10001011111110100000000"),
('doorbell_button_2', "110001011111110101010000"),
]
def analyze_ook(data, baud, bin_input=False):
bin_str = ""
# if input is already a "bin string" e.g. "11110001001010101", do nothing
# else convert raw data to above string
if (not bin_input):
bin_str = ''.join(format(ord(byte), '08b') for byte in data)
else:
bin_str = data
# determine speed of data by checking every speed regexp
# then sort tupple and return one with biggest match
speed = sorted(zero, key=lambda obj:zero[obj].subn('', bin_str))[0]
# use matched regex to normalize data into _ZERO_ and _ONE_, remove every
# tramsnission glitch at the end by removing 1 and 0 orphans
normalized_bin_str = sub('[1,0]', '', zero[speed].sub('_ZERO_', one[speed].sub('_ONE_', bin_str)))
# change back normalized string with '_ZERO_', '_ONE_' to '1' and '0'
decoded = sub('_ZERO_', '1', sub('_ONE_', '0', normalized_bin_str))
# only show logner ones
if (len(decoded) < 4):
return
# search key in know keys
found_key = [key[0] for key in know_keys if decoded == key[1]]
if found_key == []:
found_key = "unknown"
else:
found_key = found_key[0]
# present data
print "Packet len: {} speed: {} OOK decoded: {} key: {}".format(len(bin_str) / 8, baud / speed, decoded, found_key)
print "Original: {}".format(bin_str)
print
cc1101_config.py
from enum import Enum
from math import pow,floor,ceil
import serial
from time import sleep
class CCR(Enum):
IOCFG2 = 0x00
IOCFG1 = 0x01
IOCFG0 = 0x02
FIFOTHR = 0x03
SYNC1 = 0x04
SYNC0 = 0x05
PKTLEN = 0x06
PKTCTRL1 = 0x07
PKTCTRL0 = 0x08
ADDR = 0x09
CHANNR = 0x0A
FSCTRL1 = 0x0B
FSCTRL0 = 0x0C
FREQ2 = 0x0D
FREQ1 = 0x0E
FREQ0 = 0x0F
MDMCFG4 = 0x10
MDMCFG3 = 0x11
MDMCFG2 = 0x12
MDMCFG1 = 0x13
MDMCFG0 = 0x14
DEVIATN = 0x15
MCSM2 = 0x16
MCSM1 = 0x17
MCSM0 = 0x18
FOCCFG = 0x19
BSCFG = 0x1A
AGCCTRL2 = 0x1B
AGCCTRL1 = 0x1C
AGCCTRL0 = 0x1D
WOREVT1 = 0x1E
WOREVT0 = 0x1F
WORCTRL = 0x20
FREND1 = 0x21
FREND0 = 0x22
FSCAL3 = 0x23
FSCAL2 = 0x24
FSCAL1 = 0x25
FSCAL0 = 0x26
RCCTRL1 = 0x27
RCCTRL0 = 0x28
FSTEST = 0x29
PTEST = 0x2A
AGCTEST = 0x2B
TEST2 = 0x2C
TEST1 = 0x2D
TEST0 = 0x2E
PATABLE = 0x3E
_X_OSC = 26000000
_F_DIV = _X_OSC/pow(2,16)
_D_DIV = _X_OSC/pow(2,17)
_S_DIV = _X_OSC/pow(2,18)
def base_frequency(f_desired):
cfg = {}
f_desired *= pow(10, 6) # Mhz
f_div = int(floor(f_desired / CCR._F_DIV))
f_out = int(CCR._F_DIV * ceil(f_div))
print "Corrected frequency {0} Hz".format(f_out)
cfg[CCR.FREQ2] = f_div >> 16
cfg[CCR.FREQ1] = f_div >> 8 & 0xFF
cfg[CCR.FREQ0] = f_div & 0xFF
return cfg
def deviation(f_desired):
cfg = {}
f_desired *= pow(10, 3) # kHz
f_div = int(floor(f_desired / CCR._D_DIV))
y = 0
if f_div < 8:
x = 0
else:
while(1):
x = floor(f_div / pow(2, y) - 8)
if (x > 0x7):
y = y + 1
if (y >= 0x7):
x = 0x7
y = 0x7
break
else:
break
f_out = CCR._D_DIV * (8 + x) * pow(2, y)
print "Corrected deviation {0} Hz".format(f_out)
cfg[CCR.DEVIATN] = int(x) | y << 4
return cfg
def channel_spacing(f_desired):
cfg = {}
f_desired *= pow(10, 3) # kHz
f_div = int(floor(f_desired / CCR._S_DIV))
y = 0
if f_div < 256:
x = 0
else:
while(1):
x = floor(f_div / pow(2, y) - 256)
if (x > 0xFF):
y = y + 1
if (y > 0x3):
x = 0xFF
y = 0x3
break
else:
break
f_out = CCR._S_DIV * (256 + x) * pow(2, y)
print "Corrected spacing {0} Hz".format(f_out)
cfg[CCR.MDMCFG1] = y # no preambule
cfg[CCR.MDMCFG0] = x
return cfg
def channel_bandwidth(f_desired):
cfg = {}
f_desired *= pow(10, 3) # kHz
y = 0
if (f_desired == 0):
x = 0x3
y = 0x3
else:
while(1):
x = int(floor((CCR._X_OSC / f_desired / 8 / pow(2, y)) - 4))
if (x <= -4):
x = 0
y = 0
break
if (x > 0x3):
y = y + 1
if (y > 0x3):
x = 0x3
y = 0x3
break
else:
break
f_out = CCR._X_OSC / (8 * (4 + x) * pow(2, y))
print "Corrected bandwidth {0} Hz".format(f_out)
cfg[CCR.MDMCFG4] = y << 6 | x << 4
return cfg
def data_rate(b_desired):
cfg = {}
b_desired*=1.0
y = 0
while(1):
x = int(floor((b_desired / CCR._X_OSC * pow(2, 28)) / pow(2, y) - 256))
if (x > 0xFF):
y = y + 1
if (y > 0xF):
x = 0xFF
y = 0xF
break
else:
break
if (x < 0):
x = 0
y = 0
b_out = ((256 + x) * pow(2, y)) * CCR._X_OSC / pow(2, 28)
print "Corrected baudrate {0} baud".format(b_out)
cfg[CCR.MDMCFG4] = y
cfg[CCR.MDMCFG3] = x
return cfg
def modulation(modulation):
cfg = {}
mod = {}
mod['2-FSK'] = (0x00 << 4)
mod['GFSK'] = (0x01 << 4)
mod['ASK/OOK'] = (0x03 << 4)
mod['4-FSK'] = (0x04 << 4)
mod['MSK'] = (0x07 << 4)
if modulation in mod:
cfg[CCR.MDMCFG2] = mod[modulation]
else:
print "Wrong modulation, supprted are: {}".format(mod.keys())
raise RuntimeError
return cfg
def sensivity(sensiv):
cfg = {}
sens = {}
sens["24 dB"] = 0
sens["27 dB"] = 1
sens["30 dB"] = 2
sens["33 dB"] = 3
sens["36 dB"] = 4
sens["38 dB"] = 5
sens["40 dB"] = 6
sens["42 dB"] = 7
if sensiv in sens:
cfg[CCR.AGCCTRL2] = sens[sensiv]
else:
print "Wrong sensivity, supprted are: {}".format(sens.keys())
raise RuntimeError
return cfg
def manchaster(on):
cfg = {}
if on:
cfg[CCR.MDMCFG2] = (1<<3)
return cfg
def packet_len(l):
cfg = {}
if (l < 0 or l > 255):
print "Wrong packet len"
raise RuntimeError
else:
cfg[CCR.PKTLEN] = l
return cfg
def join_config(*partial_cfg):
cfg_f = {}
for cfg in partial_cfg:
for key in cfg:
if key in cfg_f:
cfg_f[key] |= cfg[key]
else:
cfg_f[key] = cfg[key]
return cfg_f
def push_config(s, c):
sleep(0.01)
s.setDTR(0)
s.setDTR(1)
for reg, value in c.iteritems():
s.write(bytearray([reg, value]))
# end of config
s.write(bytearray([0xff]))
sleep(0.01)
def reader(s, reader_run, analyze):
while reader_run():
if s.readable():
try:
out = s.read(255)
except:
print "serial exception"
break;
if (out):
analyze(out)
def get_cfg_init():
cfg_init = {}
# most of thi settings is get from SmartRF studio
cfg_init[CCR.IOCFG2] = 0x01; # 1 rx therlshold or end of packet E cs
cfg_init[CCR.IOCFG0] = 0x03;
cfg_init[CCR.FIFOTHR] = 0x47;
cfg_init[CCR.SYNC1] = 0x00;
cfg_init[CCR.SYNC0] = 0x00;
#cfg_init[CCR.PKTLEN] = 0xFF; # not used in variable packet length mode
cfg_init[CCR.PKTCTRL1] = 0x00; # dont append status, no address check
cfg_init[CCR.PKTCTRL0] = 0x00; # fixed packet length, no crc, no whitening
#cfg_init[CCR.ADDR] = 0x00; # not used
cfg_init[CCR.CHANNR] = 0x00; # channel spacing
cfg_init[CCR.FSCTRL1] = 0x0C;
cfg_init[CCR.MDMCFG2] = 0x04 # no preambule, sync carrier sense above threshold
cfg_init[CCR.MCSM1] = (0x00 << 4) | (0x03 << 2) # cca mode = always, rxoff mode = stay_in_rx
cfg_init[CCR.MCSM0] = 0x18;
cfg_init[CCR.FOCCFG] = 0x1D;
cfg_init[CCR.BSCFG] = 0x1C;
cfg_init[CCR.AGCCTRL2] = 0xC0; # second part is CS sennsivity
cfg_init[CCR.AGCCTRL1] = 0x18; # relative carrier sense enabled, aboslute disabled
cfg_init[CCR.AGCCTRL0] = 0xB0;
cfg_init[CCR.WORCTRL] = 0xFB;
cfg_init[CCR.FREND1] = 0x56; #Front End RX Configuration
cfg_init[CCR.FREND0] = 0x11; #Front End TX Configuration
cfg_init[CCR.FSCAL3] = 0xEA;
cfg_init[CCR.FSCAL2] = 0x2A;
cfg_init[CCR.FSCAL1] = 0x00;
cfg_init[CCR.FSCAL0] = 0x1F;
cfg_init[CCR.TEST2] = 0x81;
cfg_init[CCR.TEST1] = 0x35;
cfg_init[CCR.TEST0] = 0x09;
cfg_init[CCR.PATABLE] = 0x12;
#cfg_init[CCR.PATABLE] = 0xC0;
return cfg_init
No comments:
Post a Comment