Sunday, February 1, 2015

Deploy a MongoDB powered Flask app in 5 minutes

This is a quick tutorial to deploy a web service (a social network) by the LNMP (Linux, Nginx, MongoDB, Python) infrastructure on any IaaS cloud. The repo at Github is at https://github.com/dapangmao/minitwit-mongo-ubuntu.

Stack

The stack is built on the tools in the ecosystem of Python below. 

Tool Name Advantage
Cloud DigitalOcean Cheap but fast
Server distro Ubuntu 14.10 x64 Everything is latest
WSGI proxy Gunicorn Manage workers automatically
Web proxy Nginx Fast and easy to configure
Framework Flask Single file approach for MVC
Data store MongoDB No scheme needed and scalable
DevOps Fabric Agentless and Pythonic
In addition, a Supervisor running on the server provides a daemon to protect the Gunicorn-Flask process.

The MiniTwit app

The MiniTwit application is an example provided by Flask, which is a prototype of Twitter like multiple-user social network. The original application depends on SQLite. However, the data store could be modified to fit the category of NoSQL such as Google Data Store or MongoDB. A live MintiTwit demo is hosted at http://minitwit-123.appspot.com/public

Deployment

1. Install Fabric and clone the Github repo
The DevOps tool is fabric that is simply based on SSH. The fabfile.py and the staging flask files are stored on Github. We should install fabric and download the fabfile.py on the local machine before the deployment.
sudo pip install fabric 
wget https://raw.githubusercontent.com/dapangmao/minitwit-mongo-ubuntu/master/fabfile.py
fab -l
2. Enter IP from the virtual machine
A new VM from ausually emails IP address and the root password. Then we could modify the head part of the fabfile.py accordingly. There are quite a few cheaper cloud provider for prototyping other than Amazon EC2. For example, a minimal instance from DigitalOcean only costs five dollars a month. If SSH key has been uploaded, the password could be ignored.
env.hosts = ['YOUR IP ADDRESS'] # <--------- address="" enter="" ip="" span="" the="">
env.user = 'root'
env.password = 'YOUR PASSWORD'  # <--------- enter="" password="" root="" span="" the="">
3. Fire up Fabric
Now it is time to formally deploy the application. With the command below, the fabric will first install pip, git, nginx, gunicorn, supervisor and the latest MongodB, and configure them sequentially. In less than 5 minutes, a Flask and MongoDB application will be ready for use. Since DigitalOcean has its own software repository for Ubuntu, and its VMs are on SSD, the deployment is even faster, which is usually finished in one minute.
fab deploy_minitwit

Thursday, January 8, 2015

Spark practice(4): malicious web attack

Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
Sample.txt: anonymousUserID timeStamp clickCount
123    9:45am    10
234    9:46am    12
234    9:50am    20
456    9:53am    100
123    9:55am    33
456    9:56am    312
123    10:03am    110
123    10:16am    312
234    10:20am    201
456    10:23am    180
123    10:25am    393
456    10:27am    112
999    12:21pm    888

Thought

This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

Single machine solution

Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
from datetime import datetime
import time
from collections import deque

def get_minute(s, fmt = '%I:%M%p'):
    return time.mktime(datetime.strptime(s, fmt).timetuple())

def get_diff(s1, s2):
    return int(get_minute(s2) - get_minute(s1)) / 60

def find_ids(infile, duration, maxcnt):
    queue, htable, ans = deque(), {}, set()
    with open(infile, 'rt') as _infile:
        for l in _infile:
            line = l.split()
            line[2] = int(line[2])
            current_id, current_time, current_clk = line
            if current_id not in htable:
                htable[current_id] = current_clk
            else:
                htable[current_id] += current_clk
            queue.append(line)
            while queue and get_diff(queue[0][1], current_time) > duration:
                past_id, _, past_clk = queue.popleft()
                htable[past_id] -= past_clk
            if htable[current_id] > maxcnt:
                ans.add(current_id)
    return ans

if __name__ == "__main__":
    print find_ids('sample.txt', 10, 500)

Cluster solution

The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
To be continued

Tuesday, December 23, 2014

Spark practice (3): clean and sort Social Security numbers

Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049 
124-01-2425 
1231232
088-57-9593 
905-60-3585 
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
    for l in infile:
        l = l.strip()
        nums = l.split('-')
        key = -1
        if l.isdigit() and len(l) == 9:
            key = int(l)
        if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
            key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
        if key == -1:
            outfile.write(l + '\n')
        else:
            if key not in htable:
                htable[key] = l
                valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
    for x in sorted(htable):
        outfile.write(htable[x] + '\n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
    l = l.strip()
    nums = l.split('-')
    cdn1 = (l.isdigit() and len(l) == 9)
    cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
    if cdn1 or cdn2:
        return True
    return False

def set_key(l):
    global valid_cnt
    valid_cnt += 1
    l = l.strip()
    if len(l) == 9:
        return (int(l), l)
    nums = l.split('-')
    return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct() \
    .map(lambda x: set_key(x)) \
    .sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
    print 'Invalid SSN\t', x

for x in rdd2.collect():
    print 'valid SSN\t', x

print '\nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()

Friday, December 12, 2014

Spark practice (2): query text using SQL

In a class of a few children, use SQL to find those who are male and weight over 100.
class.txt (including Name Sex Age Height Weight)
Alfred M 14 69.0 112.5 
Alice F 13 56.5 84.0 
Barbara F 13 65.3 98.0 
Carol F 14 62.8 102.5 
Henry M 14 63.5 102.5 
James M 12 57.3 83.0 
Jane F 12 59.8 84.5 
Janet F 15 62.5 112.5 
Jeffrey M 13 62.5 84.0 
John M 12 59.0 99.5 
Joyce F 11 51.3 50.5 
Judy F 14 64.3 90.0 
Louise F 12 56.3 77.0 
Mary F 15 66.5 112.0 
Philip M 16 72.0 150.0 
Robert M 12 64.8 128.0 
Ronald M 15 67.0 133.0 
Thomas M 11 57.5 85.0 
William M 15 66.5 112.0

Thoughts

The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

Single machine solution

Straight-forward and simple if with Python’s built-in module sqlite3.
import sqlite3

conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute("""CREATE TABLE class
             (Name text, Sex text, Age real, Height real, Weight real)""")

with open('class.txt') as infile:
    for l in infile:
        line = l.split()
        c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
conn.commit()

for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
    print x
conn.close()

Cluster solution

Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
import os
import subprocess
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
hiveCtx = HiveContext(sc)
def trans(x):
    return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]), \
           'Height': float(x[3]), 'Weight': float(x[4])}
# Remove the output directory for JSON if it exists
if 'class-output' in os.listdir('.'):
    subprocess.call(['rm', '-rf', 'class-output'])

rdd = sc.textFile('class.txt')
rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

infile = hiveCtx.jsonFile("class-output/part-00000")
infile.registerTempTable("class")

query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
      """)
for x in query.collect():
    print x

sc.stop()
 In a conclusion, JSON should be considered if SQL is desired on Spark.

Sunday, December 7, 2014

Spark practice (1): find the stranger that shares the most friends with me

Given the friend pairs in the sample text below (each line contains two people who are friends), find the stranger that shares the most friends with me.
sample.txt
me Alice
Henry me
Henry Alice
me Jane
Alice John
Jane John
Judy Alice
me Mary
Mary Joyce
Joyce Henry
Judy me
Judy Jane
John Carol 
Carol me
Mary Henry
Louise Ronald
Ronald Thomas
William Thomas

Thoughts

The scenario is commonly seen for a social network user. Spark has three methods to query such data:
  • MapReduce
  • GraphX
  • Spark SQL
If I start with the simplest MapReduce approach, then I would like to use two hash tables in Python. First I scan all friend pairs and store the friends for each person in a hash table. Second I use another hash table to count my friends’ friends and pick out the strangers to me.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable1 = {}
with open('sample.txt', 'rb') as infile:
    for l in infile:
        line = l.split()
        if line[0] not in htable1:
            htable1[line[0]] = [line[1]]
        else:
            htable1[line[0]] += [line[1]]
        if line[1] not in htable1:
            htable1[line[1]] = [line[0]]
        else:
            htable1[line[1]] += [line[0]]

lst = htable1['me']
htable2 = {}
for key, value in htable1.iteritems():
    if key in lst:
        for x in value:
            if x not in lst and x != 'me': # should only limit to strangers
                if x not in htable2:
                    htable2[x] = 1
                else:
                    htable2[x] += 1

for x in sorted(htable2, key = htable2.get, reverse = True):
    print "The stranger {} has {} common friends with me".format(x, \
        htable2[x])
The result shows that John has three common friends like I do, followed by Joyce who has two. Therefore, John will be the one who is most likely to be recommended by the social network.

Cluster solution

If the log file for the friend pairs is quite big, say, like several GB size, the single machine solution is not able to load the data into the memory and we have to seek help from a cluster.
Spark provides the pair RDD that is similar to a hash table and essentially a key-value structure. To translate the single machine solution to a cluster one, I use the operators from Spark’s Python API including map, reduceByKey, filter, union and sortByKey.
#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
# Load data from hdfs
rdd = sc.textFile('hdfs://sample.txt') 
# Build the first pair RDD
rdd1 = rdd.map(lambda x: x.split()).union(rdd.map(lambda x: x.split()[::-1]))
# Bring my friend list to local
lst = rdd1.filter(lambda x: x[0] == 'me').map(lambda x: x[1]).collect()
# Build the second pari RDD
rdd2 = rdd1.filter(lambda x: x[0] in lst).map(lambda x: x[1]) \
    .filter(lambda x: x != 'me' and x not in lst) \
    .map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b) \
    .map(lambda (x, y): (y, x)).sortByKey(ascending = False)
# Save the result to hdfs
rdd2.saveAsTextFile("hdfs://sample_output")
# Bring the result to local since the sample is small
for x, y in rdd2.collect():
    print "The stranger {} has {} common friends with me".format(y, x)

sc.stop()
The result is the same. In this experiment, most time is spent on the data loading process from HDFS to the memory. The following MapReduce operations actually costs just a small fraction of overall time. In conclusion, Spark fits well on an iterative data analysis against existing RDD.

Friday, December 5, 2014

Use a vector to print Pascal's triangle in SAS

Yesterday Rick Wicklin showed a cool SAS/IML function to use a matrix and print a Pascal’s triangle. I come up with an alternative solution by using a vector in SAS/IML.

Method

Two functions are used, including a main function PascalRule and a helper function _PascalRule. The helper function recycles the vector every time and fills the updated values; the main function increases the length of the vector from 1 to n.

Pro

Get the nth row directly, for example, return the 10th row by PascalRule(10); no need to use a matrix or matrix related operators; use less memory to fit a possibly bigger n.

Con

More lines of codes; slowlier to print the triangle, since there is no data structure such as matrix to remember the transient numbers.
proc iml;
    /* The main function */
    start PascalRule(n);
        if n <= 1 then
            return({1});
        answer = {1, 1};
        do i = 1 to n - 2 ;
            answer = _PascalRule(answer);
        end;
        return(answer);
    finish;
    /* The helper function */
    start _PascalRule(vector);
        previous = 1;
        do i = 2 to nrow(vector);
            current = vector[i];
            vector[i] = previous + current;
            previous = current;
        end;
        vector = vector // {1};
        return(vector);
    finish;
    /* Print the pascal's triangle */
    do i = 1 to 10;
        x = PascalRule(i);
        x = x`;
        print x;
    end;
quit;
Theoretically, Rick’s solution has a time complexity of O(N^2) and a space complexity of O(N^2), while my solution has a time complexity of O(N^3) (unfortunately have to use three times of do loop in IML) and a space complexity of O(N). Actually it's a trade-off between speed and memory cost.

Friday, November 28, 2014

Minimize complexity by Spark

There is always a trade-off between time complexity and space complexity for computer programs. Deceasing the time cost will increase space cost, and vice versa, The ideal solution to parallelize the program to multiple cores if there is a multiple-core computer, or even scale it out to multiple machines across a cluster, which would eventually reduce both time complexity and space complexity.
Spark is currently the hottest platform for cluster computing on top of Hadoop, and its Python interface provides map, reduce and many other methods, which allow a mapRecdue job in a straightforward way, and therefore easily migrate an algorithm from a single machine to a cluster of many machines.
  • Minimize space complexity
There is a question to look for the only single number from a mostly paired-number array.
Single Number
  Given an array of integers, every element appears twice except for one.
    Find that single one.
    Note:
    Your algorithm should have a linear runtime complexity.
    Could you implement it without using extra memory? 
The optimal space complexity for this question is O(1) by using the bit manipulator xor. For a cluster, since Spark aggregates memory acrosss machines, the space complexity may become O(1/k), where k is the number of the machines in the cluster.
# Space.py
import pyspark
from random import shuffle
from operator import xor
sc = pyspark.Spark.Context()

# Create the test case and the target is 99
testCase = range(0, 99) + range(0, 100)
shuffle(testCase)
# Run the testing with Spark
result = sc.parallelize(testCase).reduce(xor)
# Show the result
print result
sc.stop()
  • Minimize time complexity
There is a question to implement the function (or a method) that returns the square root of an integer.
Sqrt(x)
Implement int sqrt(int x).
Compute and return the square root of x.
The optimal solution could achieve the time complexity of O(lgN) by using binary search. If we pass the sqrt function to Spark, then the time complexity will decreased to O(lgN/k), where k is the number of the machines in the cluster.
# Time.py
import pyspark
sc = pyspark.Spark.Context()
# Implement binary search for square root function
def sqrt(x):
    if x < 0 or not isinstance(x, int):
        raise ValueError
    hi, lo = x/2 + 1, 0
    while hi >= lo:
        mid = (hi + lo) / 2
        if mid * mid > x:
            hi = mid - 1
        else:
            lo = mid + 1
    return int(hi)

# Test the square root algorithm
testCase = sc.parallelize(xrange(1, 100))
result = testCase.map(lambda x: sqrt(x))
# Show the result
for x in result.collect():
    print x
sc.stop()
  • Find the worst rating by accounts
There is a question to find the worst one among a few rating letters for each of the account numbers.
Want to find the worst rating for each account number.
sample.txt is below
Account_number    Rating
1            A
1            B
2            A
2            B
2            C
3            A
3            C
the desired result should be like
1            B
2            C
3            C
The question is essentially one of the grouping questons. Spark’s pair RDD, which reflects the key-value relationship for groups, supplies a one-line solution for it.
import pyspark
sc = pyspark.SparkContext()

# Squeeze the letters by keys
rdd = sc.textFile('sample.txt')
result = rdd.map(lambda x: x.split()).filter(x: x[0].isdigit()).reduceByKey(max) 
# Show the result
for x in result.collect(): 
    print x
sc.stop()
In a conclusion, Spark significantly changes the way we think about data analysis.