Monday, October 20, 2014

Automated testing by pytest

The most hard part in testing is to write test cases, which is time-consuming and error-prone. Fortunately, besides Python built-in modules such as doctest, unittest, there are quite a few third-party packages that could help with automated testing. My favorite one is pytest, which enjoys proven record and syntax sugar.

Step 1: test-driven development

For example, there is a coding challenge on Leetcode:
Find Minimum in Rotated Sorted Array
Suppose a sorted array is rotated at some pivot unknown to you beforehand.
(i.e., 0 1 2 4 5 6 7 might become 4 5 6 7 0 1 2).
Find the minimum element.
You may assume no duplicate exists in the array.
The straightforward way to find a minimal element in an array(or list in Python) is sequential searching, which goes through every element and has a time complexity of O(N). If the array is sorted, then the minimal one is the first element that only costs O(1).
However, this question provides a rotated sorted array, which suggests a binary search and reduces the complexity from O(N) to O(logN).
As usual, write the test cases first. The great thing for pytest is that it significantly simplies the effort to code the test cases: in this example, I only use 3 lines to generate 101 test cases to cover all conditions from 0 to 99 and also include an null test.
Next step is to code the function. It is easy to transplant the iterative approach of binary search to this question. If the pointer is between a sorted segment, then return the most left element as minimal. Otherwise, adjust the right boundary and the left boundary.
# test1.py
import pytest

# Prepare 101 test cases
array = list(range(100))
_testdata = [[array[i: ] + array[ :i], 0] for i in range(100)]
_testdata += [pytest.mark.empty(([], None))]

# Code the initial binary search function
def findMinPrev(num):
    lo, hi = 0, len(num) - 1
    while lo <= hi:
        if num[lo] <= num[hi]:
            return num[lo]
        mid = (lo + hi) / 2
        if num[mid] < num[hi]:
            hi = mid - 1
        else:
            lo = mid + 1

@pytest.mark.parametrize('input, expected', _testdata)
def test_findMinPrev(input, expected):
    assert findMinPrev(input) == expected
After running the py.test -v test1.py command, part of the results shows below. 65 tests passed and 36 failed; the failed cases return the much bigger values that suggests out of boundary during loops, and the selection of the boudaries may be too aggresive.
test1.py:20: AssertionError
_________________________ test_findMinPrev[input98-0] _________________________

input = [98, 99, 0, 1, 2, 3, ...], expected = 0

    @pytest.mark.parametrize('input, expected', _testdata)
    def test_findMinPrev(input, expected):
>       assert findMinPrev(input) == expected
E       assert 98 == 0
E        +  where 98 = findMinPrev([98, 99, 0, 1, 2, 3, ...])

test1.py:20: AssertionError
==================== 36 failed, 65 passed in 0.72 seconds =====================
Now I adjust the right boundary slightly and finally come up with a solution that passes all the tests.
def findMin(num):
    lo, hi = 0, len(num) - 1
    while lo <= hi:
        if num[lo] <= num[hi]:
            return num[lo]
        mid = (lo + hi) / 2
        if num[mid] < num[hi]:
            hi = mid
        else:
            lo = mid + 1

Step 2: performance profiling

Besides the right solution, I am also interested in if the binary search method has indeed improved the performance. This step I choose line_profiler given its line-by-line ability of profiling. I take the most basic one (the sequential search) as benchmark, and also include the method that applies the min function since a few functions similar to it in Pyhton implement vectorizaiton to speed up. The test case is a rotated sorted array with 10 million elements.
# test2.py
from line_profiler import LineProfiler
from sys import maxint

@profile
def findMinRaw(num):
    """Sequential searching"""
    if not num:
        return 
    min_val = maxint
    for x in num:
        if x < min_val:
            min_val = x
    return min_val

@profile
def findMinLst(num):
    """Searching by list comprehension"""
    if not num:
        return
    return min(num)

@profile
def findMin(num):
    """"Binary search"""
    lo, hi = 0, len(num) - 1
    while lo <= hi:
        if num[lo] <= num[hi]:
            return num[lo]
        mid = (lo + hi) / 2
        if num[mid] < num[hi]:
            hi = mid
        else:
            lo = mid + 1

# Prepare a rotated array
array = list(range(10000000))
_testdata = array[56780: ] + array[ :56780]
# Test the three functions
findMinRaw(_testdata)
findMinLst(_testdata)
findMin(_testdata)
After running kernprof -l -v test2.py, I have the output as below. The sequential search has hit the loops 10000001 times and costs almost 14 seconds. The min function encapsulate all details inside and uses 0.5 seconds which is 28 times faster. On the contrary, the binary search method only takes 20 loops to find the minimal value and spends just 0.0001 seconds. As a result, while dealing with large number, an improved algorithm can really save time.
Total time: 13.8512 s
File: test2.py
Function: findMinRaw at line 4

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     4                                           @profile
     5                                           def findMinRaw(num):
     6         1           13     13.0      0.0      if not num:
     7                                                   return
     8         1            3      3.0      0.0      min_val = maxint
     9  10000001     16031900      1.6     47.5      for x in num:
    10  10000000     17707821      1.8     52.5          if x < min_val:
    11         2            5      2.5      0.0              min_val = x
    12         1            3      3.0      0.0      return min_val

Total time: 0.510298 s
File: test2.py
Function: findMinLst at line 15

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    15                                           @profile
    16                                           def findMinLst(num):
    17         1            4      4.0      0.0      if not num:
    18                                                   return
    19         1      1243016 1243016.0    100.0      return min(num)

Total time: 0.000101812 s
File: test2.py
Function: findMin at line 22

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    22                                           @profile
    23                                           def findMin(num):
    24         1           15     15.0      6.0      lo, hi = 0, len(num) - 1
    25        20           40      2.0     16.1      while lo <= hi:
    26        20           48      2.4     19.4          if num[lo] <= num[hi]:
    27         1            2      2.0      0.8              return num[lo]
    28        19           54      2.8     21.8          mid = (lo + hi) / 2
    29        19           50      2.6     20.2          if num[mid] < num[hi]:
    30         5           10      2.0      4.0              hi = mid
    31                                                   else:
    32        14           29      2.1     11.7              lo = mid + 1

Wednesday, September 24, 2014

One example of test-driven development in Python

Function or method is the most basic unit in Python programming. Test-driven development is a key for a developer to assure the code quality of those units. In his book, Harry Percival illustrated a few great examples about how to use TDD with Python and Django. It seems that for web development, TDD including unit testing and integration testing is the cornerstone for every success. For data analysis, coding mostly relies on built-in packages instead large framework like Django, which makes TDD easier. In my opnion, TDD in data analysis could have three steps.
  • Step 1: requirement analysis
    Before writing any code for data analysis, the programmer should seriously ask the customer or himself about the requirements.
    • What are the input parameter?
    • what if the input data doesn’t fit the assumptions?
    • What is the purpose of this funtction or method? what are the desired outputs?
  • For example, there is a recent coding challenge called Maximum Product Subarray.
      > Find the contiguous subarray within an array (containing at least one number) which has the largest product.
      For example, given the array [2,3,-2,4],
      the contiguous subarray [2,3] has the largest product = 6.
    
  • OK, understanding this question is quite straight-forward. Given a array(or a list in Python), you return the integer that is the maximum product from a continuous subarry out of the input array.
      def maxProduct(A):
          """ A function to find the maximum product value for a continuous subarray.
          :param A: an array or list
          :type A: list
          """
          if A is None or not isinstance(A, list):
              return None
          if len(A) == 1:
              return A[0]
          pass
    
    • A production version of the codes above should be more like:
      class FunctionInputError(Exception):
        pass
      
      def maxProduct(A):
        """ A function to find the maximum product value for a continuous subarray.
        :param A: an array or list
        :type A: list
        """
        if A is None or not isinstance(A, list):
            raise FunctionInputError('must give a list as input')
        if len(A) == 1:
            return A[0]
        pass
      
  • Step 2: write test cases
    Given not a single line of logic codes has been writen yet, I call the current step as black-box testing, which means that I want this funtion to fail every test cases. Python has a built-in module doctest, which allows embedding the test cases within the docstring. I write six test cases, run the psedu-function below and arbitrarily specify the result to be -1. As expected, it fails all the six test cases with horrible red warnings under the Python shell. That is a good thing: it proves that the testing works.
      import doctest
      def maxProduct(A):
          """ A function to find the maximum product value for a continuous subarray.
          :param A: an array or list
          :type A: list
    
          - testcase1
              >>> maxProduct([0, 2, 3,-2,4, 1, -1])
              48
    
          - testcase2
              >>> maxProduct([2, 3, 0, 2, 4, 0, 3, 4])
              12
    
          - testcase3
              >>> maxProduct([0, 5, 3, -1, -2, 0, -2, 4, 0, 3, 4])
              30
    
          - testcase4
              >>> maxProduct([0, 1, -3, -4, -5, -1, 1, 2, 1, -1, -100, 0, -100000])
              12000
    
          - testcase5
              >>> maxProduct([0, -3000, -2, 0, -100, -100, 0, -9, -8, 1, 1, 2])
              10000
    
          - testcase6
              >>> maxProduct([0, -2, 0])
              0
          """
          if A is None or not isinstance(A, list):
              return None
          if len(A) == 1:
              return A[0]
          return -1
    
      doctest.testmod()
    
  • Step 3: implement the logic
    It’s time to tackle the most difficult part: write the real function. Think about time complexity (it is best to use only one iteration around the input array which means O(n)), and space complexity (it is best not to use extra space). Run testmod() again and again to find mistakes and modify the codes accordingly. Finally I come with a solution with a helper shadow function _maxProduct. And it passes the six test cases. Althoug I am not sure that this function does not have any bug, at least it works now.
      import doctest
      from sys import maxint
    
      def maxProduct(A):
          """ A function to find the maximum product value for a continuous subarray.
          :param A: an array or list
          :type A: list
    
          - testcase1
              >>> maxProduct([0, 2, 3,-2,4, 1, -1])
              48
    
          - testcase2
              >>> maxProduct([2, 3, 0, 2, 4, 0, 3, 4])
              12
    
          - testcase3
              >>> maxProduct([0, 5, 3, -1, -2, 0, -2, 4, 0, 3, 4])
              30
    
          - testcase4
              >>> maxProduct([0, 1, -3, -4, -5, -1, 1, 2, 1, -1, -100, 0, -100000])
              12000
    
          - testcase5
              >>> maxProduct([0, -3000, -2, 0, -100, -100, 0, -9, -8, 1, 1, 2])
              10000
    
          - testcase6
              >>> maxProduct([0, -2, 0])
              0
          """
          if A is None or not isinstance(A, list):
              return None
          if len(A) == 1:
              return A[0]
          return max(_maxProduct(A), _maxProduct([a for a in reversed(A)]))
    
      def _maxProduct(A):
          max_val_forward = 1
          rst = -maxint
          for a in A:
              if a != 0:
                  max_val_forward *= a
                  rst = max(rst, max_val_forward)
              else:
                  rst = max(0, rst)
                  max_val_forward = 1
          return rst
    
      if __name__ == "__main__":
          doctest.testmod()
    
In conclusion, the most important thing about TDD in data analysis is writing test cases, which really needs a lot of training and exercises.

Sunday, August 10, 2014

Translate SAS's sas7bdat format to SQLite and Pandas

Thanks Jared Hobbs’ sas7bdat package, Python can read SAS’s data sets quickly and precisely. And it will be great to have a few extension functions to enhance this package with SQLite and Pandas.
The good things to transfer SAS libraries to SQLite:
  1. Size reduction:
    SAS’s sas7bdat format is verbose. So far successfully loaded 40GB SAS data to SQLite with 85% reduction of disk usage.
  2. Save the cost to buy SAS/ACCESS
    SAS/ACCESS costs around $8,000 a year for a server, while SQLite is accessible for most common softwares.
The good things to transfer SAS data set to Pandas:
  1. Pandas’ powerful Excel interface:
    Write very large Excel file quickly as long as memory can hold data.
  2. Validation of statistics
    Pandas works well with statsmodels and scikit-learn. Easy to validate  SAS’s outputs. 

Tuesday, June 10, 2014

Remove tabs from SAS code files

By default, SAS records the indent by pressing the tab key by tab, which causes many problem to use the code files under a different environment. There are actually two ways to eliminate the tab character in SAS and replace with empty spaces.
  • Regular expression
    Press Ctrl + H → Replace window pops out → Choose Regular expression search → At the box of Find text input \t→ At the box of Replace input multiple\s, say four

  • Editor option
    Click Tools → Options → Enhanced Editors… → Choose Insert spaces for tabs → Choose Replace tabs with spaces on file open

Monday, May 26, 2014

Steps to deploy Flask's minitwit on Google App Enginee

Flask is a light-weight web framework for Python, which is well documented and clearly written. Its Github depository provides a few examples, which includes minitwit. The minittwit website enjoys a few basic features of social network such as following, login/logout. The demo site on GAE is http://minitwit-123.appspot.com. The Github repo is https://github.com/dapangmao/minitwit.
Google App Engine or GAE is a major public clouder service besides Amazon EC2. Among the four languages(Java/Python/Go/PHP) it supports, GAE is friendly to Python users, possibly because Guido van Rossum worked there and personally created Python datastore interface. As for me, it is a good choice for a Flask app.

Step1: download GAE SDK and GAE Flask skeleton

GAE’s Python SDK tests the staging app and eventuall pushes the app to the cloud.
A Flask skeleton can be dowloaded from Google Developer Console. It contains three files:
  • app.yaml: specify the entrance of run-time
  • appengine_config.py: add the external libraries such as Flask to system path
  • main.py: the root Python program

Step2: schema design

The dabase used for the original minitwit is SQLite. The schema consists of three tables: user, follower and message, which makes a normalized database together. GAE has two Datastore APIs: DB and NDB. Since neither of them supports joining (in this case one-to-many joining for user to follower), I move the follwer table as an nested text propery into the user table, which eliminatse the need for joining.
As the result, the main.py has two data models: User and Message. They will create and maintain two kinds (or we call them as tables) with the same names in Datastore.
class User(ndb.Model):
  username = ndb.StringProperty(required=True)
  email = ndb.StringProperty(required=True)
  pw_hash = ndb.StringProperty(required=True)
  following = ndb.IntegerProperty(repeated=True)
  start_date = ndb.DateTimeProperty(auto_now_add=True)

class Message(ndb.Model):
  author = ndb.IntegerProperty(required=True)
  text = ndb.TextProperty(required=True)
  pub_date = ndb.DateTimeProperty(auto_now_add=True)
  email = ndb.StringProperty(required=True)
  username = ndb.StringProperty(required=True)

Step3: replace SQL statements

The next step is to replace SQL operations in each of the routing functions with NDB’s methods. NDB’s two fundamental methods are get() that retrieves data from Datastore as a list, and put() that pushes list to Datastore as a row. In short, data is created and manipulated as individual object.
For example, if a follower needs to add to a user, I first retrieve the user by its ID that returns a list like [username, email, pw_hash, following, start_date], where following itself is a list. Then I insert the new follower into the following element and save it back again.
u = User.get_by_id(cid)
if u.following is None:
  u.following = [whom_id]
  u.put()
else:
  u.following.append(whom_id)
  u.put()
People with experience in ORM such as SQLAlchemy will be comfortable to implement the changes.

Setp4: testing and deployment

Without the schema file, now the minitwit is a real single file web app. It’s time to use GAE SDK to test it locally, or eventually push it to the cloud. On GAE, We can check any error or warning through the Logs tab to find bugs, or view the raw data through the Datastore Viewer tab.
In conclusion, GAE has a few advantages and disadvantages to work with Flask as a web app.
  • Pro:
    • It allows up to 25 free apps (great for exercises)
    • Use of database is free
    • Automatical memoryCached for high IO
  • Con:
    • Database is No-SQL, which makes data hard to port
    • More expensive for production than EC2

Wednesday, May 21, 2014

Use recursion and gradient ascent to solve logistic regression in Python

In his book Machine Learning in Action, Peter Harrington provides a solution for parameter estimation of logistic regression . I use pandas and ggplot to realize a recursive alternative. Comparing with the iterative method, the recursion costs more space but may bring the improvement of performance.
# -*- coding: utf-8 -*-
"""
Use recursion and gradient ascent to solve logistic regression in Python
"""

import pandas as pd
from ggplot import *

def sigmoid(inX):
    return 1.0/(1+exp(-inX))

def grad_ascent(dataMatrix, labelMat, cycle):
    """
    A function to use gradient ascent to calculate the coefficients
    """
    if isinstance(cycle, int) == False or cycle < 0:
        raise ValueError("Must be a valid value for the number of iterations")
    m, n = shape(dataMatrix)
    alpha = 0.001
    if cycle == 0:
        return ones((n, 1))
    else:
        weights = grad_ascent(dataMatrix, labelMat, cycle-1)
        h = sigmoid(dataMatrix * weights)
        errors = (labelMat - h)
        return weights + alpha * dataMatrix.transpose()* errors

def plot(vector):
    """
    A funtion to use ggplot to visualize the result
    """
    x = arange(-3, 3, 0.1)
    y = (-vector[0]-vector[1]*x) / vector[2]
    new = pd.DataFrame()
    new['x'] = x
    new['y'] = array(y).flatten()
    infile.classlab = infile.classlab.astype(str)
    p = ggplot(aes(x='x', y='y', colour='classlab'), data=infile) + geom_point()
    return p + geom_line

# Use pandas to manipulate data
if __name__ == '__main__':
    infile = pd.read_csv("https://raw.githubusercontent.com/pbharrin/machinelearninginaction/master/Ch05/testSet.txt", sep='\t', header=None, names=['x', 'y', 'classlab'])
    infile['one'] = 1
    mat1 = mat(infile[['one', 'x', 'y']])
    mat2 = mat(infile['classlab']).transpose()
    result1 = grad_ascent(mat1, mat2, 500)
    print plot(result1)
​r