Tuesday, August 16, 2016

CC1101 Atmega32u "RFkitten" python source code


This code is from project: CC1101 Atmega32u USB dongle + python = RFkitten

rfkitten.py

#!/usr/bin/python
import serial
import threading
from time import sleep
from sys import exit,argv
from re import split, findall

from cc1101_config import *
import rf_analyzer

# define baud rate / needs to be global sice rf_analyzer.analyze_ook 
# requires it
baud = 9600

# helper function to split data into smaller chunks, thanks to this
# you can grab longer packets before analyzing them
def split_by_zero(data):
   # make binnary string e.g "1110001010101011011" out of data
   bin_str = ''.join(format(ord(byte), '08b') for byte in data)
   for splitted in split('0{20,}', bin_str):
      if (len(splitted) > 0):
         # make sure that we've got some zeroes at the end for regexp
         splitted+="0" * 20
         # analyze and print ook data
         rf_analyzer.analyze_ook(splitted, baud, bin_input=True)

try:
   # open serial port with max speed
   s = serial.Serial(
         port = argv[1],
         baudrate = 4000000,
         timeout = 0.01
       )

   # thread run flag
   reader_run = 1

   # define and start reading thread, you need to pass function that will
   # fetch the data, here we have split_by_zero which is defined above
   # with smaller fixed-packet lengths, you could directly pass rf_analyzer.analyze_ook here
   t = threading.Thread(target=reader, args=(s, lambda: reader_run, split_by_zero))
   #t = threading.Thread(target=reader, args=(s, lambda: reader_run, rf_analyzer.analyze_ook))
   t.start()

   # create modem configuration
   cfg_final = join_config(
                  get_cfg_init(),
                  modulation("ASK/OOK"),
                  manchaster(0),
                  base_frequency(433.92),
                  sensivity("27 dB"),
                  channel_bandwidth(100),
                  data_rate(baud),
                  packet_len(128),
               )

   # send config to cc1101
   push_config(s, cfg_final)

   # here you can test transmit mode
   if(0):
      doorbell = "110001011111110101010010"

      code = ""
      # expand 1:1 for 2.4kbps
      for c in doorbell:
         if c == '1':
            code = code + "1110"
         else:
            code = code + "1000"

      # split into array
      arr = findall('.{1,8}', code)

      # determine packet length + additional prefix and suffix bytes
      push_config(s, packet_len(len(arr) + 2))

      # send key two times
      for e in xrange(0, 2):
         # prefix
         s.write(chr(0xFF))
         for z in arr: 
            byte = int(z,2)
            # invert data
            byte_inv = ~byte & 0xFF
            s.write(chr(byte_inv))
         # suffix
         s.write(chr(0xFF))

   while reader_run:
      sleep(1)

except KeyboardInterrupt:
   print "user interrupt"
   reader_run = 0

except RuntimeError:
   print "bad config"
   reader_run = 0

finally:
   t.join();
   s.close()

rf_analyzer.py

from re import compile,sub,subn

# regexp for data rates
zero={}
zero[1] = compile("1{3}0")
zero[2] = compile("1{6}0")
zero[4] = compile("1{12}0")
zero[8] = compile("1{24}0")

one={}
one[1]  = compile("10{3,}")
one[2]  = compile("10{6,}")
one[4]  = compile("10{12,}")
one[8]  = compile("10{24,}")

# list of know keys
know_keys = [
   ('doorbell_button_1_first',  "10001011111110100000010"),
   ('doorbell_button_1',       "110001011111110101010010"),
   ('doorbell_button_2_first',  "10001011111110100000000"),
   ('doorbell_button_2',       "110001011111110101010000"),
]

def analyze_ook(data, baud, bin_input=False):
   bin_str = ""

   # if input is already a "bin string" e.g. "11110001001010101", do nothing
   # else convert raw data to above string
   if (not bin_input): 
      bin_str = ''.join(format(ord(byte), '08b') for byte in data)
   else:
      bin_str = data

   # determine speed of data by checking every speed regexp
   # then sort tupple and return one with biggest match
   speed = sorted(zero, key=lambda obj:zero[obj].subn('', bin_str))[0]

   # use matched regex to normalize data into _ZERO_ and _ONE_, remove every
   # tramsnission glitch at the end by removing 1 and 0 orphans
   normalized_bin_str = sub('[1,0]', '', zero[speed].sub('_ZERO_', one[speed].sub('_ONE_', bin_str)))

   # change back normalized string with '_ZERO_', '_ONE_' to '1' and '0'
   decoded = sub('_ZERO_', '1', sub('_ONE_', '0', normalized_bin_str))
  
   # only show logner ones
   if (len(decoded) < 4):
      return
   # search key in know keys
   found_key = [key[0] for key in know_keys if decoded == key[1]]

   if found_key == []:
      found_key = "unknown"
   else:
      found_key = found_key[0]

   # present data
   print "Packet len: {} speed: {} OOK decoded: {} key: {}".format(len(bin_str) / 8, baud / speed, decoded, found_key)
   print "Original: {}".format(bin_str)
   print

cc1101_config.py

from enum import Enum
from math import pow,floor,ceil

import serial
from time import sleep

class CCR(Enum):
   IOCFG2   = 0x00
   IOCFG1   = 0x01
   IOCFG0   = 0x02
   FIFOTHR  = 0x03
   SYNC1    = 0x04
   SYNC0    = 0x05
   PKTLEN   = 0x06
   PKTCTRL1 = 0x07
   PKTCTRL0 = 0x08
   ADDR     = 0x09
   CHANNR   = 0x0A
   FSCTRL1  = 0x0B
   FSCTRL0  = 0x0C
   FREQ2    = 0x0D
   FREQ1    = 0x0E
   FREQ0    = 0x0F
   MDMCFG4  = 0x10
   MDMCFG3  = 0x11
   MDMCFG2  = 0x12
   MDMCFG1  = 0x13
   MDMCFG0  = 0x14
   DEVIATN  = 0x15
   MCSM2    = 0x16
   MCSM1    = 0x17
   MCSM0    = 0x18
   FOCCFG   = 0x19
   BSCFG    = 0x1A
   AGCCTRL2 = 0x1B
   AGCCTRL1 = 0x1C
   AGCCTRL0 = 0x1D
   WOREVT1  = 0x1E
   WOREVT0  = 0x1F
   WORCTRL  = 0x20
   FREND1   = 0x21
   FREND0   = 0x22
   FSCAL3   = 0x23
   FSCAL2   = 0x24
   FSCAL1   = 0x25
   FSCAL0   = 0x26
   RCCTRL1  = 0x27
   RCCTRL0  = 0x28
   FSTEST   = 0x29
   PTEST    = 0x2A
   AGCTEST  = 0x2B
   TEST2    = 0x2C
   TEST1    = 0x2D
   TEST0    = 0x2E
   PATABLE  = 0x3E

   _X_OSC   = 26000000
   _F_DIV   = _X_OSC/pow(2,16)
   _D_DIV   = _X_OSC/pow(2,17)
   _S_DIV   = _X_OSC/pow(2,18)


def base_frequency(f_desired):
   cfg = {}

   f_desired *= pow(10, 6) # Mhz

   f_div = int(floor(f_desired / CCR._F_DIV))
   f_out = int(CCR._F_DIV * ceil(f_div))
   
   print "Corrected frequency {0} Hz".format(f_out)

   cfg[CCR.FREQ2] = f_div >> 16
   cfg[CCR.FREQ1] = f_div >> 8 & 0xFF
   cfg[CCR.FREQ0] = f_div & 0xFF

   return cfg


def deviation(f_desired):
   cfg = {}
  
   f_desired *= pow(10, 3) # kHz

   f_div = int(floor(f_desired / CCR._D_DIV))

   y = 0
   if f_div < 8:
      x = 0
   else:
      while(1):
         x = floor(f_div / pow(2, y) - 8)
         if (x > 0x7):
            y = y + 1
            if (y >= 0x7):
               x = 0x7
               y = 0x7
               break
         else:
            break

   f_out = CCR._D_DIV * (8 + x) * pow(2, y)

   print "Corrected deviation {0} Hz".format(f_out)

   cfg[CCR.DEVIATN] = int(x) | y << 4

   return cfg


def channel_spacing(f_desired):
   cfg = {}

   f_desired *= pow(10, 3) # kHz

   f_div = int(floor(f_desired / CCR._S_DIV))

   y = 0
   if f_div < 256:
      x = 0
   else:
      while(1):
         x = floor(f_div / pow(2, y) - 256)
         if (x > 0xFF):
            y = y + 1
            if (y > 0x3):
               x = 0xFF
               y = 0x3
               break
         else:
            break

   f_out = CCR._S_DIV * (256 + x) * pow(2, y)

   print "Corrected spacing {0} Hz".format(f_out)

   cfg[CCR.MDMCFG1] = y # no preambule
   cfg[CCR.MDMCFG0] = x

   return cfg


def channel_bandwidth(f_desired):
   cfg = {}

   f_desired *= pow(10, 3) # kHz

   y = 0
   if (f_desired == 0):
      x = 0x3
      y = 0x3
   else:
      while(1):
         x = int(floor((CCR._X_OSC / f_desired / 8 / pow(2, y)) - 4)) 
         if (x <= -4):
            x = 0
            y = 0
            break
         if (x > 0x3):
            y = y + 1
            if (y > 0x3):
               x = 0x3
               y = 0x3
               break
         else:
            break

   f_out = CCR._X_OSC / (8 * (4 + x) * pow(2, y))

   print "Corrected bandwidth {0} Hz".format(f_out)

   cfg[CCR.MDMCFG4] = y << 6 | x << 4

   return cfg

def data_rate(b_desired):
   cfg = {}

   b_desired*=1.0

   y = 0
   while(1):
      x = int(floor((b_desired / CCR._X_OSC * pow(2, 28)) / pow(2, y) - 256)) 
      if (x > 0xFF):
         y = y + 1
         if (y > 0xF):
            x = 0xFF
            y = 0xF
            break
      else:
         break

   if (x < 0):
      x = 0
      y = 0

   b_out = ((256 + x) * pow(2, y)) * CCR._X_OSC / pow(2, 28)

   print "Corrected baudrate {0} baud".format(b_out)

   cfg[CCR.MDMCFG4] = y
   cfg[CCR.MDMCFG3] = x
   
   return cfg

def modulation(modulation):
   cfg =  {}
   mod = {}

   mod['2-FSK']   = (0x00 << 4)
   mod['GFSK']    = (0x01 << 4)
   mod['ASK/OOK'] = (0x03 << 4)
   mod['4-FSK']   = (0x04 << 4)
   mod['MSK']     = (0x07 << 4)

   if modulation in mod:
      cfg[CCR.MDMCFG2] = mod[modulation]
   else:
      print "Wrong modulation, supprted are: {}".format(mod.keys())
      raise RuntimeError

   return cfg


def sensivity(sensiv):
   cfg = {}
   sens = {}

   sens["24 dB"] = 0
   sens["27 dB"] = 1
   sens["30 dB"] = 2
   sens["33 dB"] = 3
   sens["36 dB"] = 4
   sens["38 dB"] = 5
   sens["40 dB"] = 6
   sens["42 dB"] = 7

   if sensiv in sens:
      cfg[CCR.AGCCTRL2] = sens[sensiv]
   else:
      print "Wrong sensivity, supprted are: {}".format(sens.keys())
      raise RuntimeError

   return cfg

def manchaster(on):
   cfg = {}

   if on:
      cfg[CCR.MDMCFG2] = (1<<3)

   return cfg

def packet_len(l):
   cfg = {}

   if (l < 0 or l > 255):
      print "Wrong packet len"
      raise RuntimeError
   else:
      cfg[CCR.PKTLEN] = l

   return cfg

def join_config(*partial_cfg):
   cfg_f = {}
   for cfg in partial_cfg:
      for key in cfg:
         if key in cfg_f:
            cfg_f[key] |= cfg[key]
         else:
            cfg_f[key] = cfg[key]
   return cfg_f


def push_config(s, c):
   sleep(0.01)
   s.setDTR(0)
   s.setDTR(1)

   for reg, value in c.iteritems():
      s.write(bytearray([reg, value]))

   # end of config
   s.write(bytearray([0xff]))

   sleep(0.01)


def reader(s, reader_run, analyze):
   while reader_run():
      if s.readable():
         try:
            out = s.read(255)
         except:
            print "serial exception"
            break;
         if (out):
            analyze(out)


def get_cfg_init():
   cfg_init = {}

   # most of thi settings is get from SmartRF studio

   cfg_init[CCR.IOCFG2]   = 0x01;  # 1 rx therlshold or end of packet E cs
   cfg_init[CCR.IOCFG0]   = 0x03; 

   cfg_init[CCR.FIFOTHR]  = 0x47;

   cfg_init[CCR.SYNC1]    = 0x00;
   cfg_init[CCR.SYNC0]    = 0x00;

   #cfg_init[CCR.PKTLEN]   = 0xFF;  # not used in variable packet length mode

   cfg_init[CCR.PKTCTRL1] = 0x00;  # dont append status, no address check
   cfg_init[CCR.PKTCTRL0] = 0x00;  # fixed packet length, no crc, no whitening

   #cfg_init[CCR.ADDR]     = 0x00;  # not used

   cfg_init[CCR.CHANNR]   = 0x00;  # channel spacing

   cfg_init[CCR.FSCTRL1]  = 0x0C;

   cfg_init[CCR.MDMCFG2]  = 0x04 # no preambule, sync carrier sense above threshold

   cfg_init[CCR.MCSM1]    = (0x00 << 4) | (0x03 << 2) # cca mode = always, rxoff mode = stay_in_rx

   cfg_init[CCR.MCSM0]    = 0x18;

   cfg_init[CCR.FOCCFG]   = 0x1D;

   cfg_init[CCR.BSCFG]    = 0x1C;

   cfg_init[CCR.AGCCTRL2] = 0xC0;  # second part is CS sennsivity
   cfg_init[CCR.AGCCTRL1] = 0x18;  # relative carrier sense enabled, aboslute disabled
   cfg_init[CCR.AGCCTRL0] = 0xB0;

   cfg_init[CCR.WORCTRL]  = 0xFB;

   cfg_init[CCR.FREND1]   = 0x56;  #Front End RX Configuration
   cfg_init[CCR.FREND0]   = 0x11;  #Front End TX Configuration

   cfg_init[CCR.FSCAL3]   = 0xEA;
   cfg_init[CCR.FSCAL2]   = 0x2A;
   cfg_init[CCR.FSCAL1]   = 0x00;
   cfg_init[CCR.FSCAL0]   = 0x1F;

   cfg_init[CCR.TEST2]    = 0x81;
   cfg_init[CCR.TEST1]    = 0x35;
   cfg_init[CCR.TEST0]    = 0x09;

   cfg_init[CCR.PATABLE]  = 0x12;
   #cfg_init[CCR.PATABLE]  = 0xC0;

   return cfg_init

No comments:

Post a Comment