This code is from project: CC1101 Atmega32u USB dongle + python = RFkitten
rfkitten.py
#!/usr/bin/python import serial import threading from time import sleep from sys import exit,argv from re import split, findall from cc1101_config import * import rf_analyzer # define baud rate / needs to be global sice rf_analyzer.analyze_ook # requires it baud = 9600 # helper function to split data into smaller chunks, thanks to this # you can grab longer packets before analyzing them def split_by_zero(data): # make binnary string e.g "1110001010101011011" out of data bin_str = ''.join(format(ord(byte), '08b') for byte in data) for splitted in split('0{20,}', bin_str): if (len(splitted) > 0): # make sure that we've got some zeroes at the end for regexp splitted+="0" * 20 # analyze and print ook data rf_analyzer.analyze_ook(splitted, baud, bin_input=True) try: # open serial port with max speed s = serial.Serial( port = argv[1], baudrate = 4000000, timeout = 0.01 ) # thread run flag reader_run = 1 # define and start reading thread, you need to pass function that will # fetch the data, here we have split_by_zero which is defined above # with smaller fixed-packet lengths, you could directly pass rf_analyzer.analyze_ook here t = threading.Thread(target=reader, args=(s, lambda: reader_run, split_by_zero)) #t = threading.Thread(target=reader, args=(s, lambda: reader_run, rf_analyzer.analyze_ook)) t.start() # create modem configuration cfg_final = join_config( get_cfg_init(), modulation("ASK/OOK"), manchaster(0), base_frequency(433.92), sensivity("27 dB"), channel_bandwidth(100), data_rate(baud), packet_len(128), ) # send config to cc1101 push_config(s, cfg_final) # here you can test transmit mode if(0): doorbell = "110001011111110101010010" code = "" # expand 1:1 for 2.4kbps for c in doorbell: if c == '1': code = code + "1110" else: code = code + "1000" # split into array arr = findall('.{1,8}', code) # determine packet length + additional prefix and suffix bytes push_config(s, packet_len(len(arr) + 2)) # send key two times for e in xrange(0, 2): # prefix s.write(chr(0xFF)) for z in arr: byte = int(z,2) # invert data byte_inv = ~byte & 0xFF s.write(chr(byte_inv)) # suffix s.write(chr(0xFF)) while reader_run: sleep(1) except KeyboardInterrupt: print "user interrupt" reader_run = 0 except RuntimeError: print "bad config" reader_run = 0 finally: t.join(); s.close()
rf_analyzer.py
from re import compile,sub,subn # regexp for data rates zero={} zero[1] = compile("1{3}0") zero[2] = compile("1{6}0") zero[4] = compile("1{12}0") zero[8] = compile("1{24}0") one={} one[1] = compile("10{3,}") one[2] = compile("10{6,}") one[4] = compile("10{12,}") one[8] = compile("10{24,}") # list of know keys know_keys = [ ('doorbell_button_1_first', "10001011111110100000010"), ('doorbell_button_1', "110001011111110101010010"), ('doorbell_button_2_first', "10001011111110100000000"), ('doorbell_button_2', "110001011111110101010000"), ] def analyze_ook(data, baud, bin_input=False): bin_str = "" # if input is already a "bin string" e.g. "11110001001010101", do nothing # else convert raw data to above string if (not bin_input): bin_str = ''.join(format(ord(byte), '08b') for byte in data) else: bin_str = data # determine speed of data by checking every speed regexp # then sort tupple and return one with biggest match speed = sorted(zero, key=lambda obj:zero[obj].subn('', bin_str))[0] # use matched regex to normalize data into _ZERO_ and _ONE_, remove every # tramsnission glitch at the end by removing 1 and 0 orphans normalized_bin_str = sub('[1,0]', '', zero[speed].sub('_ZERO_', one[speed].sub('_ONE_', bin_str))) # change back normalized string with '_ZERO_', '_ONE_' to '1' and '0' decoded = sub('_ZERO_', '1', sub('_ONE_', '0', normalized_bin_str)) # only show logner ones if (len(decoded) < 4): return # search key in know keys found_key = [key[0] for key in know_keys if decoded == key[1]] if found_key == []: found_key = "unknown" else: found_key = found_key[0] # present data print "Packet len: {} speed: {} OOK decoded: {} key: {}".format(len(bin_str) / 8, baud / speed, decoded, found_key) print "Original: {}".format(bin_str) print
cc1101_config.py
from enum import Enum from math import pow,floor,ceil import serial from time import sleep class CCR(Enum): IOCFG2 = 0x00 IOCFG1 = 0x01 IOCFG0 = 0x02 FIFOTHR = 0x03 SYNC1 = 0x04 SYNC0 = 0x05 PKTLEN = 0x06 PKTCTRL1 = 0x07 PKTCTRL0 = 0x08 ADDR = 0x09 CHANNR = 0x0A FSCTRL1 = 0x0B FSCTRL0 = 0x0C FREQ2 = 0x0D FREQ1 = 0x0E FREQ0 = 0x0F MDMCFG4 = 0x10 MDMCFG3 = 0x11 MDMCFG2 = 0x12 MDMCFG1 = 0x13 MDMCFG0 = 0x14 DEVIATN = 0x15 MCSM2 = 0x16 MCSM1 = 0x17 MCSM0 = 0x18 FOCCFG = 0x19 BSCFG = 0x1A AGCCTRL2 = 0x1B AGCCTRL1 = 0x1C AGCCTRL0 = 0x1D WOREVT1 = 0x1E WOREVT0 = 0x1F WORCTRL = 0x20 FREND1 = 0x21 FREND0 = 0x22 FSCAL3 = 0x23 FSCAL2 = 0x24 FSCAL1 = 0x25 FSCAL0 = 0x26 RCCTRL1 = 0x27 RCCTRL0 = 0x28 FSTEST = 0x29 PTEST = 0x2A AGCTEST = 0x2B TEST2 = 0x2C TEST1 = 0x2D TEST0 = 0x2E PATABLE = 0x3E _X_OSC = 26000000 _F_DIV = _X_OSC/pow(2,16) _D_DIV = _X_OSC/pow(2,17) _S_DIV = _X_OSC/pow(2,18) def base_frequency(f_desired): cfg = {} f_desired *= pow(10, 6) # Mhz f_div = int(floor(f_desired / CCR._F_DIV)) f_out = int(CCR._F_DIV * ceil(f_div)) print "Corrected frequency {0} Hz".format(f_out) cfg[CCR.FREQ2] = f_div >> 16 cfg[CCR.FREQ1] = f_div >> 8 & 0xFF cfg[CCR.FREQ0] = f_div & 0xFF return cfg def deviation(f_desired): cfg = {} f_desired *= pow(10, 3) # kHz f_div = int(floor(f_desired / CCR._D_DIV)) y = 0 if f_div < 8: x = 0 else: while(1): x = floor(f_div / pow(2, y) - 8) if (x > 0x7): y = y + 1 if (y >= 0x7): x = 0x7 y = 0x7 break else: break f_out = CCR._D_DIV * (8 + x) * pow(2, y) print "Corrected deviation {0} Hz".format(f_out) cfg[CCR.DEVIATN] = int(x) | y << 4 return cfg def channel_spacing(f_desired): cfg = {} f_desired *= pow(10, 3) # kHz f_div = int(floor(f_desired / CCR._S_DIV)) y = 0 if f_div < 256: x = 0 else: while(1): x = floor(f_div / pow(2, y) - 256) if (x > 0xFF): y = y + 1 if (y > 0x3): x = 0xFF y = 0x3 break else: break f_out = CCR._S_DIV * (256 + x) * pow(2, y) print "Corrected spacing {0} Hz".format(f_out) cfg[CCR.MDMCFG1] = y # no preambule cfg[CCR.MDMCFG0] = x return cfg def channel_bandwidth(f_desired): cfg = {} f_desired *= pow(10, 3) # kHz y = 0 if (f_desired == 0): x = 0x3 y = 0x3 else: while(1): x = int(floor((CCR._X_OSC / f_desired / 8 / pow(2, y)) - 4)) if (x <= -4): x = 0 y = 0 break if (x > 0x3): y = y + 1 if (y > 0x3): x = 0x3 y = 0x3 break else: break f_out = CCR._X_OSC / (8 * (4 + x) * pow(2, y)) print "Corrected bandwidth {0} Hz".format(f_out) cfg[CCR.MDMCFG4] = y << 6 | x << 4 return cfg def data_rate(b_desired): cfg = {} b_desired*=1.0 y = 0 while(1): x = int(floor((b_desired / CCR._X_OSC * pow(2, 28)) / pow(2, y) - 256)) if (x > 0xFF): y = y + 1 if (y > 0xF): x = 0xFF y = 0xF break else: break if (x < 0): x = 0 y = 0 b_out = ((256 + x) * pow(2, y)) * CCR._X_OSC / pow(2, 28) print "Corrected baudrate {0} baud".format(b_out) cfg[CCR.MDMCFG4] = y cfg[CCR.MDMCFG3] = x return cfg def modulation(modulation): cfg = {} mod = {} mod['2-FSK'] = (0x00 << 4) mod['GFSK'] = (0x01 << 4) mod['ASK/OOK'] = (0x03 << 4) mod['4-FSK'] = (0x04 << 4) mod['MSK'] = (0x07 << 4) if modulation in mod: cfg[CCR.MDMCFG2] = mod[modulation] else: print "Wrong modulation, supprted are: {}".format(mod.keys()) raise RuntimeError return cfg def sensivity(sensiv): cfg = {} sens = {} sens["24 dB"] = 0 sens["27 dB"] = 1 sens["30 dB"] = 2 sens["33 dB"] = 3 sens["36 dB"] = 4 sens["38 dB"] = 5 sens["40 dB"] = 6 sens["42 dB"] = 7 if sensiv in sens: cfg[CCR.AGCCTRL2] = sens[sensiv] else: print "Wrong sensivity, supprted are: {}".format(sens.keys()) raise RuntimeError return cfg def manchaster(on): cfg = {} if on: cfg[CCR.MDMCFG2] = (1<<3) return cfg def packet_len(l): cfg = {} if (l < 0 or l > 255): print "Wrong packet len" raise RuntimeError else: cfg[CCR.PKTLEN] = l return cfg def join_config(*partial_cfg): cfg_f = {} for cfg in partial_cfg: for key in cfg: if key in cfg_f: cfg_f[key] |= cfg[key] else: cfg_f[key] = cfg[key] return cfg_f def push_config(s, c): sleep(0.01) s.setDTR(0) s.setDTR(1) for reg, value in c.iteritems(): s.write(bytearray([reg, value])) # end of config s.write(bytearray([0xff])) sleep(0.01) def reader(s, reader_run, analyze): while reader_run(): if s.readable(): try: out = s.read(255) except: print "serial exception" break; if (out): analyze(out) def get_cfg_init(): cfg_init = {} # most of thi settings is get from SmartRF studio cfg_init[CCR.IOCFG2] = 0x01; # 1 rx therlshold or end of packet E cs cfg_init[CCR.IOCFG0] = 0x03; cfg_init[CCR.FIFOTHR] = 0x47; cfg_init[CCR.SYNC1] = 0x00; cfg_init[CCR.SYNC0] = 0x00; #cfg_init[CCR.PKTLEN] = 0xFF; # not used in variable packet length mode cfg_init[CCR.PKTCTRL1] = 0x00; # dont append status, no address check cfg_init[CCR.PKTCTRL0] = 0x00; # fixed packet length, no crc, no whitening #cfg_init[CCR.ADDR] = 0x00; # not used cfg_init[CCR.CHANNR] = 0x00; # channel spacing cfg_init[CCR.FSCTRL1] = 0x0C; cfg_init[CCR.MDMCFG2] = 0x04 # no preambule, sync carrier sense above threshold cfg_init[CCR.MCSM1] = (0x00 << 4) | (0x03 << 2) # cca mode = always, rxoff mode = stay_in_rx cfg_init[CCR.MCSM0] = 0x18; cfg_init[CCR.FOCCFG] = 0x1D; cfg_init[CCR.BSCFG] = 0x1C; cfg_init[CCR.AGCCTRL2] = 0xC0; # second part is CS sennsivity cfg_init[CCR.AGCCTRL1] = 0x18; # relative carrier sense enabled, aboslute disabled cfg_init[CCR.AGCCTRL0] = 0xB0; cfg_init[CCR.WORCTRL] = 0xFB; cfg_init[CCR.FREND1] = 0x56; #Front End RX Configuration cfg_init[CCR.FREND0] = 0x11; #Front End TX Configuration cfg_init[CCR.FSCAL3] = 0xEA; cfg_init[CCR.FSCAL2] = 0x2A; cfg_init[CCR.FSCAL1] = 0x00; cfg_init[CCR.FSCAL0] = 0x1F; cfg_init[CCR.TEST2] = 0x81; cfg_init[CCR.TEST1] = 0x35; cfg_init[CCR.TEST0] = 0x09; cfg_init[CCR.PATABLE] = 0x12; #cfg_init[CCR.PATABLE] = 0xC0; return cfg_init
No comments:
Post a Comment