Showing posts with label raspberry pi. Show all posts
Showing posts with label raspberry pi. Show all posts

Friday, 4 February 2022

Cheating at Wordle with Python, NLTK and a Raspberry Pi

At the time of writing, the game Wordle is taking the world by storm.  Quite rightly, it's genius and so is the guy who invented it.  

Here's one I did earlier:


Two things to know about me:

  • I'm rubbish with word games.
  • I like to trick my children into thinking I'm cleverer than I am.

So having struggled a bit with Wordle I wondered if I could write some code to more efficiently (for that read cheat) find Wordle answers.  For this I used my trusty Raspberry Pi, Python and the Natural Language ToolKit (NLTK) module.  I'd used NLTK previously for some online data science courses so I knew it could give me a "corpus" (so set) of words to play with. 

First I installed NLTK for use with Python 3 on the Raspberry Pi using this command: sudo pip3 install nltk

Then I looked at which word corpus NLTK has that I could use.  There's some details here and a few places pointed me to the "Brown" corpus as a good place to start.  To make this corpus available for NLTK in a Python script I opened a Python3 shell and ran these commands:

Python 3.7.3 (default, Jan 22 2021, 20:04:44)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import nltk
>>> nltk.download('brown')

So that sets a corpus of words ready to use in a Python script.  I won't go into the detail of how Wordle is played but overall you get 6 goes to guess a 5 letter word.  With each guess, the game tells you for each letter:
  • Whether it is in the final word in the exact position you guessed it it,  I call these exact matches.
  • Whether it is in the final word but not in the exact position you guessed it in.  I call these partial matches.
  • Whether it is not in the final word.  I call these non-matches.
I then played around with snippets of code to examine words from the corpus and rule them in or out based upon whether they had exact matches in them or non-matches not in them.  That led me overall to an algorithm of:

-Load word corpuses (at the time of writing I use Brown, Webtext and Gutenberg)
-Build a dictionary of 5 letter words and their frequency of occurrence in the corpus
-Setup data structures to log exact, partial and non-matches.

-Loop at least six times doing:
1-Make a prediction (which I then enter into Wordle) based on the data structures
2-Input the result of the prediction from Wordle
3-Update some data structures

(Full code is at the end of this post)

Taking step 2 above first, I simply enter the result of each Wordle round in a coded string.  So take this result:

I enter this as S_E,U_N,G_N,A_P,R_P.  Where _E is for exact, _N is for non-matched and _P is for partial.

Looking at step 3, I update 3 data structures based upon the input the user provides.  The data structures are:
  • A tuple of exact matches where the tuple contains the letter and the exact match position.  So from the above it would be [('S', 0)].
  • A dictionary of partial matches where the key is the letter and the value is a list of positions that the letter is not in.  S from the above it would be {'A': [3], 'R': [4]}
  • A list of non-matches.  So from the above it would be ['U', 'G']
If it's an exact match I do two things:
1)Update the tuple of exact matches with the new matched letter/position combination.
2)Update the partial match dictionary as this is a a)new position a partial matched letter can't be in and b)it may require removal of an entry as what was previously partially matched is now fully matched.

If it's a partial match I do two things:
1)Add a new partial (with the letter position) or update the list of positions for an existing partial
2)If it's a new partial, don't just add the position it is in, add the position of all the other exact matches as the letter can't be in this position either.

If it's a non-match I just add the letter to the list of non-matches.

So for game above, the log output showed this:
Enter the result of that round in format A_E,B_P,C_N where _E for exact, _P for partial, _N for non matched:S_E,U_N,G_N,A_P,R_P
########Processing an exact match for letter S
########Processing a non match for letter U
########Processing a non match for letter G
########Processing a partial match for letter A
########Processing a partial match for letter R
########Exact matches [('S', 0)]
########Partial matches {'A': [0, 1, 3], 'R': [1, 2, 4]}
########Non matches ['O', 'E', 'P', 'T', 'U', 'G']

Finally looking at step 1, for round 1 I recommend a initial word based upon the following logic:
1)Do a letter frequency count across all the 5 letter words in the corpus.
2)Find a word in the corpus that has each of the 5 most common letters.  This leads me to use "AROSE".

Then for subsequent rounds I do the following:
1)Eliminate any words from the corpus that have the letters in the non-matching list.
2)Eliminate any words from the corpus that don't have the exact matching letters in the exact matching position.
3)Eliminate any words from corpus that don't have the partial matching letters or, if they do, have the partial matching letters in the positions logged that they can't be in.

...which results in a list of words which I augment with the frequency of occurrence in the corpus.  I then print this and let the use choose the word to enter next.  

The story so far is that I have played 4 games with this code and:
1)I have got the right answer every time, but
2)My 17 year old daughter has got the right answer in fewer guesses 3 times out of 4!

Full code listing:




#!/usr/bin/env python3
from nltk.corpus import brown
from nltk.corpus import webtext
from nltk.corpus import gutenberg
import sys

#Our main 5 letter word list
five_letters = {}    #A dictionary of 5 letter words with frequencies

#Gets a list of 5 letter words
def add_to_list(in_word_list):
  for word in in_word_list:
    if word.isalpha():
      if len(word) == 5:
        if word.upper() not in five_letters:    #as variety of case of same word could be present
          five_letters[word.upper()] = 1
        else:
          five_letters[word.upper()] += 1

#Compares our 5 letter word corpus with the list of letter frequencies to find the best start word
#The best word has all the highest frequency letters just once
def get_start_words(letter_frequencies, start_pos):
  words_found = 0               #Counts when our set of 5 high frequency letters matches a word from our corpus  
  list_start_pos  = start_pos   #Used to track where we are in our letter frequency list
  word_list = []

  #Loop until we've found words from the word corpus or we've exhausted the
  while words_found == 0 and list_start_pos < 22:    #It's 22 as this will mean we've got to positions 21,22,23,24,25 in the character frequency list
    #Loop for each of the words in the corpus
    for my_word in five_letters:
      letter_count = 0     #Incremented if we find a letter in the word
      #Then for each of the five letters identified by the outer while loop
      for i in range (list_start_pos, list_start_pos + 5):
        my_letter = letter_frequencies[i][0]
        if my_letter in my_word:
          letter_count += 1 #Count if the letter is in the word
      #return word_list
      #See if we found all our letters in the word.  So if one letter is there twice we should not 
      if letter_count == 5:
        word_list.append(my_word)
        words_found += 1
    list_start_pos += 1

  #Return our word list
  return word_list


print ("########Building five letter word list")
add_to_list(brown.words())
print ("Added Brown corpus and word list has {} entries".format(len(five_letters)))
add_to_list(webtext.words())
print ("Added Webtext corpus and word list has {} entries".format(len(five_letters)))
add_to_list(gutenberg.words())
print ("Added Gutenberg corpus and word list has {} entries".format(len(five_letters)))

#Calculate letter frequencies
print ("########Calculating letter frequencies")
freq_dict = {}
for my_word in five_letters:
  #Build letter frequencies
  for my_letter in my_word:
    #See of the letter is a key in the dictionary
    if my_letter in freq_dict.keys():
      freq_dict[my_letter] += 1    #Increment value
    else:
      freq_dict[my_letter] = 1     #Add value

#Sort the dictionary to get the highest probability letters and show the user
sorted_values = sorted(freq_dict.items(), key=lambda x:x[1], reverse=True)
print (sorted_values)

#Holds the round number
round_number = 1

#Holds the exact matches.  A list of tuples
exact_matches = []
#Holds the partial matches.  A dictionary of key is letter, value is list of positions letter is not in
partial_matches = {}
#Holds the non-matched letters.  A list
non_matches = []

#Loop for all the rounds
while round_number < 7 and len(exact_matches) < 5:
  print ("######################################################")
  print ("########This is round number {}".format(round_number))
  #Special case for round 1
  if round_number == 1:
    #Get a list of possible starting words based upon the letter frequencies
    print ("########Getting a list of words to start off with")

    #Imagine we get 6 wrong starting words!  Accoutn for each
    for k in range (0,3):
      start_words = get_start_words(sorted_values, k)
      print ("Where we start at position {} of the letter frequencies the start words are: {}".format(k, start_words))
  else:
    #Step 1, rule out a bunch of words that have eliminated letters in them
    print ("########Assessing data from previous round to make a recommendation")
    print ("########First rule out words based on letters found not to exist in the answer")
    after_non_match_check = []
    for my_word in five_letters:
      has_ruled_out = False
      for letter in non_matches:
        if letter in my_word:
          has_ruled_out = True
      if not has_ruled_out:
        after_non_match_check.append(my_word)
    print ("########At the end of this we are down to {} words".format(len(after_non_match_check)))

    #Step 2, rule in a set of words based on the matched list
    print ("########Second rule in words based on exact matched letters")
    after_full_match_check = []
    for my_word in after_non_match_check:
      has_ruled_out = False
      for match_tuple in exact_matches:
        if my_word[match_tuple[1]] != match_tuple[0]:
          has_ruled_out = True
      if not has_ruled_out:
        after_full_match_check.append(my_word)
    print ("########At the end of this we are down to {} words".format(len(after_full_match_check)))
    #print (after_full_match_check)

    #Step 3, rule out a set of words where letters are in partial match positions
    print ("########Third rule in words based on partial matched letters")
    after_partial_match_check = []
    for my_word in after_full_match_check:
      has_ruled_out = False
      for my_partial in partial_matches:  #Loop through each dictionary entry
        #First simply check the partial is in the word
        if my_partial in my_word:
          #Now check for the partial positions
          for my_partial_pos in partial_matches[my_partial]:    #Loop through each item in the partial match
            if my_partial == my_word[my_partial_pos]:
              has_ruled_out = True
        else:
          has_ruled_out = True

      if not has_ruled_out:
        after_partial_match_check.append(my_word)
    print ("########At the end of this we are down to {} words.  Recommendation:".format(len(after_partial_match_check)))

    #Form an ordered list of words based on overall word frequency from the corpus that was built right at the start
    suggestion_dict = {}
    ordered_suggestion = {}
    for word_suggestion in after_partial_match_check:
      suggestion_dict[word_suggestion] = five_letters[word_suggestion]

    #Order the suggestions
    ordered_suggestion = sorted(suggestion_dict.items(), key=lambda x:x[1], reverse=True)
    #Tell the user
    if len(ordered_suggestion) < 11:
      print (ordered_suggestion)
    else:
      #Print just the first 10
      print (list(ordered_suggestion)[:10])

  #Get input from the user as to what happened in that round
  round_result = input("Enter the result of that round in format A_E,B_P,C_N where _E for exact, _P for partial, _N for non matched:")
  #Pull round result apart and process each
  result_list = round_result.split(",")
  #Loop for each result
  letter_pos = 0        #Holds which letter result position we're looking at
  for result in result_list:
    #Get the entered letter and the result
    entered_letter = result[0]
    letter_result = result[2]
    if letter_result == "E":
      print("########Processing an exact match for letter {}".format(entered_letter))
      #See if we already have this exact match.  If not, add it
      letter_found = False
      for my_tuple in exact_matches:
        if my_tuple[0] == entered_letter and my_tuple[1] == letter_pos:    #So we could have double letters so this checks for existence of the letter in the given position
          letter_found = True
      if not letter_found:
        exact_matches.append((entered_letter,letter_pos))
      #Update existing partial matches as well, i.e. 1)They can't be in the position of the found letter.  Also, if there is already a partial for what is now exact, remove it
      if entered_letter in partial_matches:
        partial_matches.pop(entered_letter)
      else:
        for partial in partial_matches:
          if letter_pos not in partial_matches[partial]:
            partial_matches[partial].append(letter_pos)

    elif letter_result == "P":
      print("########Processing a partial match for letter {}".format(entered_letter))
      if entered_letter in partial_matches:
        #Look partial record, seeing if the current position is in the position list.  If not, add it
        if letter_pos not in partial_matches[entered_letter]:
          partial_matches[entered_letter].append(letter_pos)
      else:
        #Entered letter not in partial matches dictionary.  Add it together with the position
        partial_list = []
        partial_list.append(letter_pos)
        #But as this is a new partial we also need to add all existing exact matches which the letter can also not be in that position
        for exact in exact_matches:
          partial_list.append(exact[1])
        #FInal update of the partial list
        partial_matches[entered_letter] = partial_list

    elif letter_result == "N":
      print("########Processing a non match for letter {}".format(entered_letter))
      #See if we already have this non-match.  If not, add it
      if entered_letter not in non_matches:
        non_matches.append(entered_letter)
    #Update so we get the next letter position
    letter_pos +=1

  #Show what the structures are at the end of this round
  print ("########Exact matches {}".format(exact_matches))
  print ("########Partial matches {}".format(partial_matches))
  print ("########Non matches {}".format(non_matches))

  #End of turn Update for next loop
  round_number += 1

#See how we came out of the main loop
if len(exact_matches) == 5:
  print ("########You won!  Way to go/cheat")
else:
  print ("########All rounds completed, you lost!")

Wednesday, 29 March 2017

Giving Alexa a new Sense - Vision! (Using Microsoft Cognitive APIs)

Amazon Alexa is just awesome so I thought it would be fun to let her "see".  Here's a video of this in action:



...and here's a diagram of the "architecture" for this solution:



The clever bit is the Microsoft Cognitive API so let's look at that first!  You can get a description of the APIs here and sign up for a free account.  To give Alexa "vision" I decided to use the Computer Vision API which takes a image URL or an uploaded image, analyses it and provides a description.

Using the Microsoft Cognitive API developer console I used the API to analyse the image of a famous person shown below and requested a "Description":



...and within the response JSON I got:

"captions": [ { "text": "Elizabeth II wearing a hat and glasses", "confidence": 0.28962254803103227 } ]

...now that's quite some "hat" she's wearing there (!) but it's a pretty good description.

OK - So here's a step-by-step guide as to how I put it all together.

Step 1 - An Apache Webserver with Python Scripts
I needed AWS Lambda to be able to trigger a picture to be taken and a Cognitive API call to be made so I decided to run this all from a Raspberry Pi + camera in my home.

I already have a Apache webserver running on my Raspberry Pi 2 and there's plenty of descriptions on the internet of how to do it (like this one).

I like a bit of Python so I decided to use Python scripts to carry out the various tasks.  Enabling Python for cgi-bin is very easy; here's an example of how to do it.

So to test it I created the following script:

#!/usr/bin/env python
print "Content-type: text/html\n\n"
print "<h1>Hello World</h1>"

...and saved it as /usr/lib/cgi-bin/hello.py.  I then tested it by browsing to http://192.168.1.3/cgi-bin/hello.py (where 192.168.1.3 is the IP address on my home LAN that my Pi is sitting on).  I saw this:



Step 2 - cgi-bin Python Script to Take a Picture
The first script I needed was one to trigger my Pi to take a picture with the Raspberry Pi camera.  (More here on setting up and using the camera).

After some trial and error I ended up with this script:

#!/usr/bin/env python
from subprocess import call
import cgi

def picture(PicName):
  call("/usr/bin/raspistill -o /var/www/html/" + PicName + " -t 1s -w 720 -h 480", shell=True)

#Get arguments
ArgStr = ""
arguments = cgi.FieldStorage()
for i in arguments.keys():
 ArgStr = ArgStr + arguments[i].value

#Call a function to get a picture
picture(ArgStr)

print "Content-type: application/json\n\n"

print "{'Response':'OK','Arguments':" + "'" + ArgStr + "'}"

So what does this do?  The ArgString and for i in arguments.keys() etc. code section makes the Python script analyse the URL entered by the user and extract any query strings.  The query string can be used to specify the file name of the photo that is taken.  So for example this URL:

http://192.168.1.3/cgi-bin/take_picture_v1.py?name=hello.jpg

...will mean a picture is taken and saved as hello.jpg.

The "def Picture" function then uses the "call" module to run a command line command to take a picture with the Raspberry pi camera and save it in the root directory for the Apache 2 webserver.

Finally the script responds with a simple JSON string that can be rendered in a browser or used by AWS Lambda.  The response looks like this in a browser:


Step 3 - Microsoft Cognitive API for Image Analysis
So now we've got a we need to analyse it.  For this task I leaned heavily on the code published here so all plaudits and credit to chsienki and none to me. I used most of the code but removed the lines that overlaid on top of the image and showed it on screen.

#!/usr/bin/env python
import time
from subprocess import call
import requests
import cgi

# Variables
#_url = 'https://westus.api.cognitive.microsoft.com/vision/v1.0/analyze'
_url = 'https://westus.api.cognitive.microsoft.com/vision/v1.0/describe'

_key = "your key"   #Here you have to paste your primary key
_maxNumRetries = 10

#Does the actual results request
def processRequest( json, data, headers, params ):

    """
    Helper function to process the request to Project Oxford

    Parameters:
    json: Used when processing images from its URL. See API Documentation
    data: Used when processing image read from disk. See API Documentation
    headers: Used to pass the key information and the data type request
    """

    retries = 0
    result = None

    while True:
        print("This is the URL: " + _url)
        response = requests.request( 'post', _url, json = json, data = data, headers = headers, params = params )

        if response.status_code == 429:

            print( "Message: %s" % ( response.json()['error']['message'] ) )

            if retries <= _maxNumRetries:
                time.sleep(1)
                retries += 1
                continue
            else:
                print( 'Error: failed after retrying!' )
                break

        elif response.status_code == 200 or response.status_code == 201:

            if 'content-length' in response.headers and int(response.headers['content-length']) == 0:
                result = None
            elif 'content-type' in response.headers and isinstance(response.headers['content-type'], str):
                if 'application/json' in response.headers['content-type'].lower():
                    result = response.json() if response.content else None
                elif 'image' in response.headers['content-type'].lower():
                    result = response.content
        else:
            print( "Error code: %d" % ( response.status_code ) )
            #print( "Message: %s" % ( response.json()['error']['message'] ) )
            print (str(response))

        break

    return result

#Get arguments from the query string sent
ArgStr = ""
arguments = cgi.FieldStorage()
for i in arguments.keys():
 ArgStr = ArgStr + arguments[i].value

# Load raw image file into memory
pathToFileInDisk = r'/var/www/html/' + ArgStr

with open( pathToFileInDisk, 'rb' ) as f:
    data = f.read()

# Computer Vision parameters
params = { 'visualFeatures' : 'Color,Categories'}

headers = dict()
headers['Ocp-Apim-Subscription-Key'] = _key
headers['Content-Type'] = 'application/octet-stream'

json = None

result = processRequest( json, data, headers, params )

#Turn to a string
JSONStr = str(result)

#Change single to double quotes
JSONStr = JSONStr.replace(chr(39),chr(34))

#Get rid of preceding u in string
JSONStr = JSONStr.replace("u"+chr(34),chr(34))


if result is not None:
  print "content-type: application/json\n\n"

  print JSONStr

So here I take arguments as before to know which file to process, "read" the file and then use the API to get a description of it.  I had to play a bit with the response to get it into a format that could be parsed by the Python json module.  This is where I turn single quotes to double quotes and get rid of preceding "u" characters.  There's maybe a more Pythonic way to do this, please let me know if you know a way....

When you call the script via a browser you get:


Looking at the JSON structure in more detail you can see a "description" element which is how the Microsoft Cognitive API has described the image.

Step 4 - Alexa Skills Kit Configuration and Lambda Development
The next step is to configure the Alexa Skills kit and write the associated AWS Lambda function.  I've covered how to do this previously (like here) so won't cover all that again here.

The invocation name is "yourself"; hence you can say "Alexa, ask yourself...".

There is only one utterance which is:
AlexaSeeIntent what can you see

...so what you actually say to Alexa is "Alexa, ask yourself what can you see".  

This then maps to the intent structure below:

{
  "intents": [
    {
      "intent": "AlexaSeeIntent"
    },
    {
      "intent": "AMAZON.HelpIntent"
    },
    {
      "intent": "AMAZON.StopIntent"
    },
    {
      "intent": "AMAZON.CancelIntent"
    }
  ]
}

Here we have a boilerplate intent structure with the addition on AlexaSeeIntent which is what will be passed to the AWS Lambda function.

I won't list the whole AWS Lambda function below, but here's the relevant bits:

#Some constants
TakePictureURL = "http://<URL or IP Address>/cgi-bin/take_picture_v1.py?name=hello.jpg"
DescribePictureURL = "http://<URL or IP Address>/cgi-bin/picture3.py?name=hello.jpg"

Then the main Lambda function to handle the AlexaSeeIntent:

def handle_see(intent, session):
  session_attributes = {}
  reprompt_text = None
  
  #Call the Python script that takes the picture
  APIResponse = urllib2.urlopen(TakePictureURL).read()
  
  #Call the Python script that analyses the picture.  Strip newlines
  APIResponse = urllib2.urlopen(DescribePictureURL).read().strip()
  
  #Turn the response into a JSON object we can parse
  JSONData = json.loads(APIResponse)
  
  PicDescription = str(JSONData["description"]["captions"][0]["text"])
  
  speech_output = PicDescription
  should_end_session = True
  
  # Setting reprompt_text to None signifies that we do not want to reprompt
  # the user. If the user does not respond or says something that is not
  # understood, the session will end.
  return build_response(session_attributes, build_speechlet_response(
        intent['name'], speech_output, reprompt_text, should_end_session))

So super, super simple.  Call the API to take the picture, call the API to analyse it, pick out the description and read it out.

Here's the image that was analysed for the Teddy Bear video:



Here's another example:


The image being:


...and another:


...based upon this image:
  

Now to think about what other senses I can give to Alexa...


Sunday, 8 January 2017

Using the Resources of the Fitbit API

In previous posts I've covered the basics of using a Raspberry Pi and the Fitbit API to extract and analyse the data created by a Fitbit Fitness tracker.  In particular, in this post I covered using OAUTH2.0 to access the API.

For this post I thought I'd do a more general overview of the range of data available through the Fitbit API.  So go back to the OAUTH2.0 post to see how to get access and refresh tokens etc.  Then come back here to see what you can do with the API.

Once you've got the required tokens, all you need to do to access data is specify different URLs.  In this post I'll describe a range of URLs that can be used to access different data.  There's a massive variety of data available and almost limitless combinations so just use this as a set of worked examples then use the Fitbit Developer documentation to work out other options.

Remember I'm just a guy that does this for a hobby and likes to help other people along the way.  If I use the wrong terms or describe things in a less than 100% accurate manner then please take this in the right spirit or even comment below to help me correct matters.

Activity Data
The most generic data available from the API.  Here's a simple URL to give you summary of activity data for a given date:

https://api.fitbit.com/1/user/-/activities/date/2016-12-27.json

So a simple base URL and extra elements to specify "activities" and a date to get data for.  This yields:

{"activities":[],"goals":{"activeMinutes":30,"caloriesOut":2812,"distance":8.05,"floors":25,"steps":10000},"summary":{"activeScore":-1,"activityCalories":1952,"caloriesBMR":1725,"caloriesOut":3353,"distances":[{"activity":"total","distance":16.93},{"activity":"tracker","distance":16.93},{"activity":"loggedActivities","distance":0},{"activity":"veryActive","distance":13.18},{"activity":"moderatelyActive","distance":0.59},{"activity":"lightlyActive","distance":3.16},{"activity":"sedentaryActive","distance":0}],"elevation":155.45,"fairlyActiveMinutes":15,"floors":51,"heartRateZones":[{"caloriesOut":1546.2586,"max":89,"min":30,"minutes":793,"name":"Out of Range"},{"caloriesOut":271.2272,"max":124,"min":89,"minutes":47,"name":"Fat Burn"},{"caloriesOut":21.8036,"max":151,"min":124,"minutes":2,"name":"Cardio"},{"caloriesOut":861.961,"max":220,"min":151,"minutes":57,"name":"Peak"}],"lightlyActiveMinutes":206,"marginalCalories":1332,"restingHeartRate":55,"sedentaryMinutes":725,"steps":17309,"veryActiveMinutes":83}}

So even with it in JSON format you can see some of the key Fitbit metrics that are returned (I've marked these in red).

Step Data
The main reason people get their Fitbit is to count their steps!

Here's a simple example of a URL that provides data for 7 days up to and including the date you specify:

https://api.fitbit.com/1/user/-/activities/steps/date/2016-12-27/7d.json"

The response is as follows:

{"activities-steps":[{"dateTime":"2016-12-21","value":"16156"},{"dateTime":"2016-12-22","value":"9075"},{"dateTime":"2016-12-23","value":"7963"},{"dateTime":"2016-12-24","value":"18698"},{"dateTime":"2016-12-25","value":"11316"},{"dateTime":"2016-12-26","value":"11473"},{"dateTime":"2016-12-27","value":"17309"}]}

So here you can see the 7 measurements and how the value for 2016-12-27 matches that of the activity data above.

You could get the same data but by specifying a start and end date by using:

https://api.fitbit.com/1/user/-/activities/steps/date/2016-12-21/2016-12-27.json

If you ask Fitbit nicely they will give you access to intraday data.  See here for more details on how to do this.  An example URL to get 15 minute segments for a single day is:

https://api.fitbit.com/1/user/-/activities/steps/date/2016-12-27/1d/15min.json

Which gives data like this:

{"activities-steps":[{"dateTime":"2016-12-27","value":"17309"}],"activities-steps-intraday":{"dataset":[{"time":"00:00:00","value":0},{"time":"00:15:00","value":0},{"time":"00:30:00","value":0},{"time":"00:45:00","value":0},{"time":"01:00:00","value":0},{"time":"01:15:00","value":0},

...not that interesting for this time period as I was asleep.  It gets better later in the day when I went for a run!

{"time":"08:45:00","value":350},{"time":"09:00:00","value":2016},{"time":"09:15:00","value":2522},{"time":"09:30:00","value":2508},{"time":"09:45:00","value":2555},{"time":"10:00:00","value":628}

Other Measurements
You can use the same URL structure for other key tracker metrics like:

/calories
/distance
/floors

(i.e. replace "/steps" in the above examples with these words).

Sleep
If you have a tracker that measures sleep then you can use a URL like the one below to get sleep data:

https://api.fitbit.com/1/user/-/sleep/date/2016-12-27.json

Which gives data like this at the start:

{"sleep":[{"awakeCount":4,"awakeDuration":4,"awakeningsCount":32,"dateOfSleep":"2016-12-27","duration":29580000,"efficiency":88,"isMainSleep":true,"logId":13314423872,"minuteData":[{"dateTime":"22:38:00","value":"2"},{"dateTime":"22:39:00","value":"3"},{"dateTime":"22:40:00","value":"1"},{"dateTime":"22:41:00","value":"1"},{"dateTime":"22:42:00","value":"1"},{"dateTime":"22:43:00","value":"1"}

So some generic information then (by default) a record for every minute of your sleep.  Here the values are:
1=Asleep
2=Awake
3=Really awake

Then a summary at the end:

"summary":{"totalMinutesAsleep":434,"totalSleepRecords":1,"totalTimeInBed":493}}

Heart Rate
Finally, if you have a tracker that also measures heart rate you can use a URL like the one below to get data:

https://api.fitbit.com/1/user/-/activities/heart/date/2016-12-27/1d.json


{"activities-heart":[{"dateTime":"2016-12-27","value":{"customHeartRateZones":[],"heartRateZones":[{"caloriesOut":1546.2586,"max":89,"min":30,"minutes":793,"name":"Out of Range"},{"caloriesOut":271.2272,"max":124,"min":89,"minutes":47,"name":"Fat Burn"},{"caloriesOut":21.8036,"max":151,"min":124,"minutes":2,"name":"Cardio"},{"caloriesOut":861.961,"max":220,"min":151,"minutes":57,"name":"Peak"}],"restingHeartRate":55}}],"activities-heart-intraday":{"dataset":[{"time":"00:00:00","value":65},{"time":"00:01:00","value":65},{"time":"00:02:00","value":65},{"time":"00:03:00","value":65},{"time":"00:08:00","value":65},{"time":"00:09:00","value":65},{"time":"00:10:00","value":65},{"time":"00:11:00","value":64},{"time":"00:12:00","value":64},{"time":"00:13:00","value":65},{"time":"00:14:00","value":66},{"time":"00:15:00","value":64},{"time":"00:16:00","value":61},

So first some general data then some measurements at up to one minute intervals (if you have access to this data).

Summary
So that was a whistle-stop tour of using the API.  Have a play, use different URLs and see what you can get!

Sunday, 17 April 2016

Strava and Fitbit API Mash-up Using Raspberry Pi and Python

Previously I blogged on how I used data from the Fitbit API to look at cadence information gathered during runs I'd logged on Strava.

Now that was all very good and informative but:
  • I analysed the Strava results manually.  i.e. Stepped through the website and noted down dates, times and durations of runs.
  • I used the Fitbit OAUTH1.0 URL builder website.  Very manual and using OAUTH1.0, (since deprecated, see here on using OAUTH2.0).
...so it  was time to automate the process and upgrade to OAUTH2.0!  Hence it was time to break out the Raspberry Pi and get coding.

Full code at the bottom of this post (to not interrupt the flow) but the algorithm is as follows:
  • Loop, pulling back activity data from the Strava API (method here).
  • Select each Strava run (i.e. filter out rides and swims) and log key details (start date/time, duration, name, distance)
  • Double check if the date of the run was after I got my Fitbit (the FitbitEpoch constant).  If it is, form the Fitbit API URL using date and time parameters derived from the Strava API output.
  • Call the Fitbit API using the OAUTH2.0 method.
  • Log the results for later processing.
...easy (the trickiest bit was date/time manipulation)!

This provides output like this:
pi@raspberrypi:~/Exercise/logs $ head steps_log_1.txt
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,1,09:02:00,100
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,2,09:03:00,169
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,3,09:04:00,170
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,4,09:05:00,171
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,5,09:06:00,172
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,6,09:07:00,170
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,7,09:08:00,170
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,8,09:09:00,170
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,9,09:10:00,168
2016-04-16T09:02:03Z_4899.7,Parkrun 20160416,10,09:11:00,170

So a date and time,name of the run, minute of run and step count.

So easy to filter out interesting runs to compare:
pi@raspberrypi:~/Exercise/logs $ grep 20150516 steps_log_1.txt > parkrun_steps_1.txt
pi@raspberrypi:~/Exercise/logs $ grep 20160416 steps_log_1.txt >> parkrun_steps_1.txt

Then import to R for post analysis and plotting:
> parkrun1 <- read.csv(file=file.choose(),head=FALSE,sep=",")

> colnames(parkrun1) <- c("DateTimeDist","Name","Minute","TimeOfDay","Steps") 
> head(parkrun1)
                 DateTimeDist                              Name Minute TimeOfDay Steps
1 2015-05-16T09:00:00Z_5000.0 Naked Winchester Parkrun 20150516      1  09:00:00    85
2 2015-05-16T09:00:00Z_5000.0 Naked Winchester Parkrun 20150516      2  09:01:00   105
3 2015-05-16T09:00:00Z_5000.0 Naked Winchester Parkrun 20150516      3  09:02:00   107
4 2015-05-16T09:00:00Z_5000.0 Naked Winchester Parkrun 20150516      4  09:03:00   136
5 2015-05-16T09:00:00Z_5000.0 Naked Winchester Parkrun 20150516      5  09:04:00   162
6 2015-05-16T09:00:00Z_5000.0 Naked Winchester Parkrun 20150516      6  09:05:00   168

library(ggplot2)
> ggplot(data = parkrun1, aes(x = Minute, y = Steps, color = Name)) 
+ geom_point() + geom_line() 
+ labs(x="Minute", y="Steps") + ggtitle("Running Cadence - Parkrun") 

Yielding this graph:

Interesting that I took longer to get going on the 2015 run, maybe there was congestion at the start.  The key thing I was looking for was the "steady state" cadence comparison between 2015 and 2016.  It's higher in 2016 which is exactly what I wanted to see as it's something I've worked on improving.

Using the same method I plotted the chart below which shows a long run prior to a half-marathon then the half-marathon itself:


Now this is interesting. The cadence was slightly higher for the whole of the training run (blue line) and much more consistent.  For the half-marathon itself (red line) my cadence really tailed off which is in tune with my last post where I analysed my drop off in speed over the final quarter of the run.

Here's all the code.  Modify for your API credentials, file system and Fitbit "Epoch" accordingly:

pi@raspberrypi:~/Exercise $ more strava_fitbit_v1.py
#here's a typical Fitbit API URL
#FitbitURL = "https://api.fitbit.com/1/user/-/activities/steps/date/2016-01-31/1d/1min/time/09:00/09:15.json"

import urllib2
import base64
import json
from datetime import datetime, timedelta
import time
import urllib
import sys
import os

#The base URL we use for activities
BaseURLActivities = "https://www.strava.com/api/v3/activities?access_token=<Strava_Token_Here>per_page=200&page="
StepsLogFile = "/home/pi/Exercise/logs/steps_log_1.txt"

#Start element of Fitbit URL
FitbitURLStart = "https://api.fitbit.com/1/user/-/activities/steps/date/"

#Other constants
MyFitbitEpoch = "2015-01-26"

#Use this URL to refresh the access token
TokenURL = "https://api.fitbit.com/oauth2/token"

#Get and write the tokens from here
IniFile = "/home/pi/Exercise/tokens.txt"

#From the developer site
OAuthTwoClientID = "FitBitClientIDHere"
ClientOrConsumerSecret = "FitbitSecretHere"

#Some contants defining API error handling responses
TokenRefreshedOK = "Token refreshed OK"
ErrorInAPI = "Error when making API call that I couldn't handle"

#Get the config from the config file.  This is the access and refresh tokens
def GetConfig():
  print "Reading from the config file"

  #Open the file
  FileObj = open(IniFile,'r')

  #Read first two lines - first is the access token, second is the refresh token
  AccToken = FileObj.readline()
  RefToken = FileObj.readline()

  #Close the file
  FileObj.close()

  #See if the strings have newline characters on the end.  If so, strip them
  if (AccToken.find("\n") > 0):
    AccToken = AccToken[:-1]
  if (RefToken.find("\n") > 0):
    RefToken = RefToken[:-1]

  #Return values
  return AccToken, RefToken

def WriteConfig(AccToken,RefToken):
  print "Writing new token to the config file"
  print "Writing this: " + AccToken + " and " + RefToken

  #Delete the old config file
  os.remove(IniFile)

  #Open and write to the file
  FileObj = open(IniFile,'w')
  FileObj.write(AccToken + "\n")
  FileObj.write(RefToken + "\n")
  FileObj.close()

#Make a HTTP POST to get a new
def GetNewAccessToken(RefToken):
  print "Getting a new access token"

  #RefToken = "e849e1545d8331308eb344ce27bc6b4fe1929d8f1f9f3a056c5636311ba49014"

  #Form the data payload
  BodyText = {'grant_type' : 'refresh_token',
              'refresh_token' : RefToken}
  #URL Encode it
  BodyURLEncoded = urllib.urlencode(BodyText)
  print "Using this as the body when getting access token >>" + BodyURLEncoded

  #Start the request
  tokenreq = urllib2.Request(TokenURL,BodyURLEncoded)
  #Add the headers, first we base64 encode the client id and client secret with a : inbetween and create the authorisation header
  tokenreq.add_header('Authorization', 'Basic ' + base64.b64encode(OAuthTwoClientID + ":" + ClientOrConsumerSecret))
  tokenreq.add_header('Content-Type', 'application/x-www-form-urlencoded')

  #Fire off the request
  try:
    tokenresponse = urllib2.urlopen(tokenreq)

    #See what we got back.  If it's this part of  the code it was OK
    FullResponse = tokenresponse.read()

    #Need to pick out the access token and write it to the config file.  Use a JSON manipluation module
    ResponseJSON = json.loads(FullResponse)

    #Read the access token as a string
    NewAccessToken = str(ResponseJSON['access_token'])
    NewRefreshToken = str(ResponseJSON['refresh_token'])
    #Write the access token to the ini file
    WriteConfig(NewAccessToken,NewRefreshToken)

    print "New access token output >>> " + FullResponse
  except urllib2.URLError as e:
    #Gettin to this part of the code means we got an error
    print "An error was raised when getting the access token.  Need to stop here"
    print e.code
    print e.read()
    sys.exit()

#This makes an API call.  It also catches errors and tries to deal with them
def MakeAPICall(InURL,AccToken,RefToken):
  #Start the request
  req = urllib2.Request(InURL)

  #Add the access token in the header
  req.add_header('Authorization', 'Bearer ' + AccToken)

  print "I used this access token " + AccToken
  #Fire off the request
  try:
    #Do the request
    response = urllib2.urlopen(req)
    #Read the response
    FullResponse = response.read()

    #Return values
    return True, FullResponse
  #Catch errors, e.g. A 401 error that signifies the need for a new access token
  except urllib2.URLError as e:
    print "Got this HTTP error: " + str(e.code)
    HTTPErrorMessage = e.read()
    print "This was in the HTTP error message: " + HTTPErrorMessage
    #See what the error was
    if (e.code == 401) and (HTTPErrorMessage.find("Access token invalid or expired") > 0):
      GetNewAccessToken(RefToken)
      return False, TokenRefreshedOK
    elif (e.code == 401) and (HTTPErrorMessage.find("Access token expired") > 0):
      GetNewAccessToken(RefToken)
      return False, TokenRefreshedOK
    #Return that this didn't work, allowing the calling function to handle it
    return False, ErrorInAPI


#This function takes a date and time and checks whether it's after a given date
def CheckAfterFitbit(InDateTime):
  #See how many days there's been between today and my first Fitbit date.
  StravaDate = datetime.strptime(InDateTime,"%Y-%m-%dT%H:%M:%SZ")    #First Fitbit date as a Python date object
  FitbitDate = datetime.strptime(MyFitbitEpoch,"%Y-%m-%d")                   #Last Fitbit date as a Python date object

  #See if the provided date is greater than the Fitbit date.  If so, return True, else return  false
  if ((StravaDate - FitbitDate).days > -1):
    return True
  else:
    return False

#Forms the full URL to use for Fitbit.  Example:
#https://api.fitbit.com/1/user/-/activities/steps/date/2016-01-31/1d/1min/time/09:00/09:15.json
def FormFitbitURL(URLSt,DtTmSt,Dur):
  #First we need to add the date component which should be the first part of the date and time string we got from Strava.  Add the next few static bits as well
  FinalURL = URLSt + DtTmSt[0:10] + "/1d/1min/time/"

  #Now add the first time part which is also provided as a parameter. This will take us back to the start of the minute STrava started which is what we want
  FinalURL = FinalURL + DtTmSt[11:16] + "/"

  #Now we need to compute the end time which needs a bit of maths as we need to turn the start date into a Python date object and then add on elapsed seconds,
  #turn back to a string and take the time part
  StravaStartDateTime = datetime.strptime(DtTmSt,"%Y-%m-%dT%H:%M:%SZ")

  #Now add elapsed time using time delta function
  StravaEndDateTime = StravaStartDateTime + timedelta(seconds=int(Dur))
  EndTimeStr = str(StravaEndDateTime.time())

  #Form the final URL
  FinalURL = FinalURL + EndTimeStr[0:5] + ".json"
  return FinalURL


#@@@@@@@@@@@@@@@@@@@@@@@@@@@This is the main part of the code
#Open the file to use
MyFile = open(StepsLogFile,'w')

#Loop extracting data.  Remember it comes in pages.  Initialise variables first, including the tokens to use
EndFound = False
LoopVar = 1
AccessToken = ""
RefreshToken = ""

#Get the tokens from the config file
AccessToken, RefreshToken = GetConfig()

#Main loop - Getting all activities
while (EndFound == False):
  #Do a HTTP Get - First form the full URL
  ActivityURL = BaseURLActivities + str(LoopVar)
  StravaJSONData = urllib2.urlopen(ActivityURL).read()

  if StravaJSONData != "[]":   #This checks whether we got an empty JSON response and so should end
    #Now we process the JSON
    ActivityJSON = json.loads(StravaJSONData)

    #Loop through the JSON structure
    for JSONActivityDoc in ActivityJSON:
      #See if it was a run.  If so we're interested!!
      if (str(JSONActivityDoc["type"]) == "Run"):
        #We want to grab a date, a start time and a duration for the Fitbit API.  We also want to grab a distance which we'll use as a grpah legend
        print "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
        StartDateTime = str(JSONActivityDoc["start_date_local"])
        StravaDuration = str(JSONActivityDoc["elapsed_time"])
        StravaDistance = str(JSONActivityDoc["distance"])

        StravaName = str(JSONActivityDoc["name"])

        #See if it's after 2015-01-26 which is when I got my Fitbit
        if CheckAfterFitbit(StartDateTime):
          #Tell the user what we're doing
          print "Strava Date and Time: " +  StartDateTime
          print "Strava Duration: " + StravaDuration
          print "Strava Distance: " + StravaDistance

          #Form the URL to use for Fitbit
          FitbitURL = FormFitbitURL(FitbitURLStart,StartDateTime,StravaDuration)
          print "Am going to call FitbitAPI with: " + FitbitURL

          #Make the API call
          APICallOK, APIResponse = MakeAPICall(FitbitURL, AccessToken, RefreshToken)
          #See how this came back.
          if not APICallOK:    #An error in the response.  If we refreshed tokens we go again.  Else we exit baby!
            if (APIResponse == TokenRefreshedOK):
              #Just make the call again
              APICallOK, APIResponse = MakeAPICall(FitbitURL, AccessToken, RefreshToken)
            else:
              print "An error occurred when I made the Fitbit API call.  Going to have to exit"
              sys.exit(0)

          #If we got to this point then we must have got an OK response.  We need to process this into the text file.  Format is:
          #Date_Distance,MinuteWithinRun,Time,Steps
          #print APIResponse
          ResponseAsJSON = json.loads(APIResponse)
          MinNum = 1    #Use this to keep track of the minute within the run, incrementing each time
          for StepsJSON in ResponseAsJSON["activities-steps-intraday"]["dataset"]:
            OutString = StartDateTime + "_" + StravaDistance + "," + StravaName + "," + str(MinNum) + "," + str(StepsJSON["time"]) + "," + str(StepsJSON["value"]) + "\r\n"
            #Write to file
            MyFile.write(OutString)
            #Increment the loop var
            MinNum += 1

    #Set up for next loop
    LoopVar += 1
  else:
    EndFound = True

#Close the log file
MyFile.close()