分类 算法 下的文章

流量控制算法

很多场景下都需要流量控制,从底层的数据包传输到购物秒杀。

常见的流量控制算法有两种,一个是漏桶算法,桶本身具有一个恒定的速率往下流水,而上方一直有水进入桶中。当桶还未满时,上方的水可以加入。一旦水满,上方的水就溢出,可以认为请求被丢弃。

这个算法不难理解,可以保证系统的稳定,因为水流出的速度一直是不变的,即使在有突发流量的情况下。但是如果到了网站秒杀的情境下,有突发流量,这个算法就不好用了。更常用的是令牌通算法。

令牌桶还是可以认为是一个盛满了令牌的箱子,每处理一个请求就会拿走一个令牌,如果令牌没有了,请求就会失败。而根据速率限制,令牌还是一直在添加的,所以还是有一部分的请求是可以慢慢的拿到令牌的。而一开始箱子里面是有一些令牌的,一开始的突发流量都可以成功的拿到令牌。

根据 https://github.com/celery/kombu/blob/master/kombu/utils/limits.py , 我简单改写了一下,可以很直观的看出来流量控制结果。

# coding=utf-8
import time


class TokenBucket(object):
    def __init__(self, fill_rate, capacity):
        self.capacity = float(capacity)
        self._left_tokens = capacity
        self.fill_rate = float(fill_rate)
        self.timestamp = time.time()

    def consume(self, tokens=1):
        """Return :const:`True` if the number of tokens can be consumed
        from the bucket.  If they can be consumed, a call will also consume the
        requested number of tokens from the bucket. Calls will only consume
        `tokens` (the number requested) or zero tokens -- it will never consume
        a partial number of tokens."""
        if tokens <= self.tokens:
            self._left_tokens -= tokens
            return True
        return False

    def expected_time(self, tokens=1):
        """Return the time (in seconds) when a new token is expected
        to be available. This will not consume any tokens from the bucket."""
        _tokens = self.tokens
        tokens = max(tokens, _tokens)
        return (tokens - _tokens) / self.fill_rate

    @property
    def tokens(self):
        if self._left_tokens < self.capacity:
            now = time.time()
            delta = self.fill_rate * (now - self.timestamp)
            self._left_tokens = min(self.capacity, self._left_tokens + delta)
            self.timestamp = now
        return self._left_tokens


true = false = 0

# 每秒往桶里面添加5个令牌,桶的大小是50
bucket = TokenBucket(fill_rate=5, capacity=50)
for i in range(300):
    time.sleep(0.1)
    if bucket.consume():
        true += 1
        print "Accepted", i
    else:
        false += 1
        print "Dropped, time left ", bucket.expected_time()
print true, false

要注意的是,我上面的demo代码是存在存在并发竞态条件问题的,但是出于性能和简化写法的考虑,这个是可以接受的,毕竟流量限制一般也不需要太精确。

winnowing 算法

最近尝试将 winnowing 算法应用于抄袭检测中,这是一种字符串指纹算法,详细的内容见下面这篇论文。

Winnowing Local Algorithms for Document Fingerprinting.pdf

k-grams 是最常见的指纹算法了,它将一个大字符串分为很多小字符串,而每个小字符串仅仅差一个字符,是一种错位的排列。比如 adorunrunrunadorunrun,进行 5-grams 之后就是

adoru dorun orunr runru 
unrun nrunr runru unrun 
nruna runad unado nador 
adoru dorun orunr runru 
unrun

k 就是每个小字符串的长度。

然后对这个小字符串组计算 hash,并抽样得到大字符串的特征值。抽样的算法有很多,不同的算法可能会影响最后的精确度,比如 hash 结果都是数字,就取出 hash % p = 0的,但是缺点就是特征中可能遗漏大段的内容,造成结果不准确。其余的算法也是有自己的缺点,比如取最小值。

winnowing 就是要结果对 hash 取样的时候遇到的问题,它将 hash 类似 k-grams 一样进行分组,比如 hash 值是

77 72 42 17 98 50 17 98 8 88 67 39 77 72 42 17 98

假设我们希望任意t个字符之内(不包含t)的重复都能被查出来,那么我们就设置一个窗口 w = t - k + 1。举个例子,比如在上面 5-grams 的例子中我们希望 t = 8 ,那么我们就设置 w = 8 - 5 + 1 = 4,于是,得到了以下14个hash值的窗口:

(77, 74, 42, 17) (74, 42, 17, 98) (42, 17, 98, 50)
(17, 98, 50, 17) (98, 50, 17, 98) (50, 17, 98,  8)
(17, 98,  8, 88) (98,  8, 88, 67) ( 8, 88, 67, 39)
(88, 67, 39, 77) (67, 39, 77, 74) (39, 77, 74, 42)
(77, 74, 42, 17) (74, 42, 17, 98)

我们选出每个窗口中的最小值,因为相连几个窗口的最小值可能指的是文档中同一个 hash,这个时候我们只需要在这个 hash 第一次出现的那个窗口选择它就可以了,其它的窗口就不再需要选择最小值了。比如在这个例子中,前三个窗口的最小值都是17,而且指向的是同一个 hash ,所以我们只需要选择第一个就可以了,另外两个窗口都不需要选择,直到第四个窗口又出现了另外一个17。通过这个算法我们就可以选出以下5个 hash 值:

17 17 8 39 17

这个就是这整个文档的fingerprint。如果需要字符串出现的位置不影响结果的话,将 hash 值去重就好了。

下面是我使用 Python 简单的实现

# coding=utf-8
import hashlib


class Winnowing(object):
    def __init__(self, text, k=5, w=10):
        """
        :param text:
        :param k: 字符串分组长度
        :param w: 窗口长度
        :return:
        """
        self.text = text
        self.k = k
        self.w = w

    def _kgrams(self, l, size):
        """
        将 l 分组,每组大小是size,每组错位一个元素
        :param l: 
        :param size: 分组大小
        :return:
        """
        n = len(l)

        if n < size:
            yield l
        else:
            for i in xrange(n - size + 1):
                yield l[i:i+size]

    def _hash(self, string):
        return int(hashlib.md5(string).hexdigest()[:8], 16)

    def fingerprint(self):
        cur = 0
        result = []
        for item in self._kgrams(map(lambda x: self._hash(x), self._kgrams(self.text, self.k)), self.w):
            # 对于hash分组,每组都是找到最右边的最小值
            min_num = item[0]
            for i in range(len(item)):
                if item[i] <= min_num:
                    min_num = item[i]
            result.append(min_num)
            cur += 1
        return set(result)
print Winnowing("abcdefghijklmnopqrstuvwxyz").fingerprint()

参考 http://ytliu.info/blog/2013/12/16/fingerprintsuan-fa-cha-chao-xi/

动态规划学习

最长递增子序列问题

一个序列有N个数:A1,A[2],…,A[N],求出最长递增子序列的长度。

动态规划最重要的就是把大问题化小(有点像递归)还有就是找出状态转移方程。比如说以arr[i]结尾的最长自增子序列的长度为dp[i]的话,比如对于测试数据5,3,4,8,6,7来说:

  • 第一个数字5,dp[0] = 1
  • 第一个数字3,前面没有比他还小的了,dp1 = 1
  • 第三个数字4,最长的递增子序列就是3,4,dp[2] = 2
  • 第四个数组8,dp[3] = 3
  • 第五个数字6,d[4] = 3
  • 第六个数字7,dp[5] = 4

也就是说将所有的数字遍历一遍(arr[i]),然后对于这个数字前面的部分arr[j], j < i来说,再逐个的遍历,对于如果a[i] > a[j]的,这个时候dp[j] + 1就是当前的一个递增子序列的长度,如果dp[j] + 1 > dp[i]的话,就让dp[i] = dp[j + 1]更新最大值。

#include <iostream>

using namespace std;

int max(int a, int b)
{
    if(a >= b)
    {
        return a;
    }
    else
    {
        return b;
    }
}


int main()
{
    int arr[1000];
    int dp[1000];
    int i = 0;

    while(scanf("%d", &(arr[i])) == 1)
    {
        i += 1;
    }

    int res = 0;

    for(int m = 0;m < i;m ++)
    {
        dp[m] = 1;
        for(int n = 0;n < m;n++)
        {
            if(arr[n] < arr[m])
            {
                dp[m] = max(dp[m], dp[n] + 1);
            }
        }
        res = max(res, dp[m]);
    }
    printf("%d", res);

    return 0;
}

还有一个简单的例子

将一个由N行数字组成的三角形,如图所以,设计一个算法,计算出三角形的由顶至底的一条路径,使该路径经过的数字总和最大。
sanjiaoxing_thumb.png

思路基本一样

#include <iostream>

using namespace std;

struct node
{
    int value;
    int sum;
};

struct node node(int value)
{
    struct node n = {value, 0};
    return n;
}


struct node data[5][5] = {
                      {node(7)},
                 {node(3), node(8)},
              {node(8), node(1), node(0)},
         {node(2), node(7), node(4), node(4)},
    {node(4), node(5), node(2), node(6), node(6)}};


int max(int a, int b)
{
    if(a >= b)
        return a;
    return b;
}

int main()
{
    int i, j;
    data[0][0].sum = 7;

    for(i = 1;i < 5;i++)
    {
        for(j = 0;j <= i;j++)
        {
            if(j == 0 || j == i)
            {
                data[i][j].sum = data[i - 1][j].sum + data[i][j].value;
            }
            else
            {
                data[i][j].sum = max(data[i - 1][j].sum, data[i - 1][j - 1].sum) + data[i][j].value;
            }

        }
    }
    int result = -1;

    for(int m = 0; m < 5;m++)
    {
        if(data[4][m].sum > result)
        {
            result = data[4][m].sum;
        }
    }
    printf("%d", result);
    return 0;
}

最大连续子序列

http://acm.hdu.edu.cn/showproblem.php?pid=1231

#include <iostream>

using namespace std;

//sum[i]=max(sum[i-1]+a[i],a[i])

int max(int a, int b)
{
    if(a > b)
        return a;
    return b;
}

int main()
{
    int arr[10002];
    int n;
    scanf("%d", &n);

    if(!n)
        return 0;

    for(int i = 0;i < n;i++)
    {
        scanf("%d", &(arr[i]));
    }

    int sum[10002];

    sum[0] = arr[0];

    for(int i = 1;i < n;i++)
    {
        sum[i] = max(sum[i - 1] + arr[i], arr[i]);
    }

    int max = sum[0];
    int l = 0;
    for(int i = 1;i < n;i++)
    {
        if(sum[i] > max)
        {
            max = sum[i];
            l = i;
        }
    }

    if(max < 0)
    {
        int flag = -1;
        for(int i = 0;i < n;i++)
        {
            if(arr[i] > 0)
            {
                flag = 1;
                break;
            }
        }
        if(flag == -1)
        {
            printf("%d %d %d", 0, arr[0], arr[n - 1]);
            return 0;
        }
    }

    printf("%d", max);

    for(int i = l; i >= 0;i--)
    {
        max -= arr[i];
        if(!max)
        {
            printf(" %d %d\n", arr[i], arr[l]);
            return 0;
        }
    }


    return 0;
}

但是现在还是wrong answer,还在找原因,基本的测试数据是对的。

Factorial Trailing Zeroes

https://leetcode.com/problems/factorial-trailing-zeroes/

给定一个数字n,求n的阶乘的结果最后面有多少个0。

第一印象是这个题目有规律的,就先用python暴力计算了几十个结果,但是没发现什么规律。后来就想到,对每个数字都进行质因数分解,因为结果后面的0只能由2×5得到,而且分解结果肯定是2的个数多于5,也就是说分解结果中5个个数加起来就是了。

分解质因数都是针对非质数来说的,而且5需要作为一个因子,首先判断结尾是不是0或者5,如果不是的话,直接返回0就是了。否则才进行分解。

class Solution(object):
    def f(self, num):
        if num % 5 !=0:
            return 0
        else:
            count = 0
            while num:
                if num % 5 == 0:
                    count += 1
                num = num / 5
            return count

    def trailingZeroes(self, n):
        r = 0
        for i in range(1, n + 1):
            r += self.f(i)
        return r

但是这个代码在n比较大的时候超时。

然后经过讨论,其实也有规律的。比如150的阶乘,绝大多数0和5结尾的只能分解出一个5来,而25,50, 75, 100之类的都含有两个5,而125含有3个5,这样的话,每次去除5^m,m从1开始递增。

class Solution:
    # @return an integer
    def trailingZeroes(self, n):
        r = 0
        b = 1
        while n / pow(5, b):
            r += n / pow(5, b)
            b += 1
        return r

再推导一下就是

class Solution:
    # @return an integer
    def trailingZeroes(self, n):
        r = 0
        while n:
            t = n / 5
            r += t
            n = t

        return r

关于浮点运算

浮点运算是不精确的,对于这个,我最初的印象大致是来自c语言的这个代码

#define EPSILON 0.0000001 //根据精度需要
if(fabs(fa - fb) < EPSILON)
{
    printf("fa<fb\n");
}

那时候也只是会这么机械的去用,但是不清楚原理。而最近在使用浮点数运算的时候也遇到一些坑,就深入的看了一下。

首先是几个测试用例

0.7 + 0.1
81.6 * 100
0.7 - 0.43

用 Python 或者浏览器的 console 计算上面的这几个式子都不会得到正确的结果。
Python是使用双精度浮点数的,一个浮点数占用8个字节,也就是64位。第1 bit 位用来存储符号,决定这个数是正数还是负数,然后使用11 bit 来存储指数部分,剩下的52 bit 用来存储尾数。
浮点数能不能准确的显示和运算主要是和它转换为二进制后的数字位数有关。如果有位数小于64的二进制数,那么它肯定能准确的表示一个数字。相反,如果一个十进制数字需要超过64位的二进制来表示,那么肯定就是不准确的了,因为计算机只会存储64位二进制。比如说0.1的二进制形式其实是0.00011001100110011001100110011001100110011001100110011001100...

怎么算出来的?
十进制转换为二进制的原则是

  • 整数部分对 2 取余然后逆序排列
  • 小数部分乘 2 取整数部分, 然后顺序排列

2.25 的二进制表示是?
整数部分的二进制表示为 10, 小数部分我们逐步来算
0.25 * 2 = 0.5 整数部分取 0
0.5 * 2 = 1.0 整数部分取 1
所以 2.25 的二进制表示为 10.01

0.1 的表示是什么?
我们继续按照浮点数的二进制表示来计算
0.1 * 2 = 0.2 整数部分取 0
0.2 * 2 = 0.4 整数部分取 0
0.4 * 2 = 0.8 整数部分取 0
0.8 * 2 = 1.6 整数部分取 1
0.6 * 2 = 1.2 整数部分取 1
0.2 * 2 = 0.4 整数部分取 0
...
所以你会发现, 0.1 的二进制表示是 0.00011001100110011001100110011……0011
0011作为二进制小数的循环节不断的进行循环.

在 Python 中如果要进行浮点运算,一般使用 Decimal

>>> from decimal import Decimal
>>> Decimal('0.1') + Decimal('0.7')
Decimal('0.8')
>>> Decimal('81.6') * Decimal('100')
Decimal('8160.0')
>>> Decimal('0.7') - Decimal('0.43')
Decimal('0.27')

而前几天在 js 中遇到的就只能自己去特殊处理一下了。

//除法函数,用来得到精确的除法结果
//说明:javascript的除法结果会有误差,在两个浮点数相除的时候会比较明显。这个函数返回较为精确的除法结果。
//调用:accDiv(arg1,arg2)
//返回值:arg1除以arg2的精确结果
function accDiv(arg1, arg2) {
    var t1 = 0, t2 = 0, r1, r2;
    try {
        t1 = arg1.toString().split(".")[1].length
    } catch (e) {
    }
    try {
        t2 = arg2.toString().split(".")[1].length
    } catch (e) {
    }
    with (Math) {
        r1 = Number(arg1.toString().replace(".", ""));
        r2 = Number(arg2.toString().replace(".", ""));
        return (r1 / r2) * pow(10, t2 - t1);
    }
}

//给Number类型增加一个div方法,调用起来更加方便。
Number.prototype.div = function (arg) {
    return accDiv(this, arg);
};

//乘法函数,用来得到精确的乘法结果
//说明:javascript的乘法结果会有误差,在两个浮点数相乘的时候会比较明显。这个函数返回较为精确的乘法结果。
//调用:accMul(arg1,arg2)
//返回值:arg1乘以arg2的精确结果
function accMul(arg1, arg2) {
    var m = 0, s1 = arg1.toString(), s2 = arg2.toString();
    try {
        m += s1.split(".")[1].length
    } catch (e) {
    }
    try {
        m += s2.split(".")[1].length
    } catch (e) {
    }
    return Number(s1.replace(".", "")) * Number(s2.replace(".", "")) / Math.pow(10, m);
}

//给Number类型增加一个mul方法,调用起来更加方便。
Number.prototype.mul = function (arg) {
    return accMul(arg, this);
};

//加法函数,用来得到精确的加法结果
//说明:javascript的加法结果会有误差,在两个浮点数相加的时候会比较明显。这个函数返回较为精确的加法结果。
//调用:accAdd(arg1,arg2)
//返回值:arg1加上arg2的精确结果
function accAdd(arg1, arg2) {
    var r1, r2, m;
    try {
        r1 = arg1.toString().split(".")[1].length
    } catch (e) {
        r1 = 0
    }
    try {
        r2 = arg2.toString().split(".")[1].length
    } catch (e) {
        r2 = 0
    }
    m = Math.pow(10, Math.max(r1, r2));
    return (arg1 * m + arg2 * m) / m;
}
//给Number类型增加一个add方法,调用起来更加方便。
Number.prototype.add = function (arg) {
    return accAdd(arg, this);
};

//减法函数
function accSub(arg1, arg2) {
    var r1, r2, m, n;
    try {
        r1 = arg1.toString().split(".")[1].length
    } catch (e) {
        r1 = 0
    }
    try {
        r2 = arg2.toString().split(".")[1].length
    } catch (e) {
        r2 = 0
    }
    m = Math.pow(10, Math.max(r1, r2));
    //last modify by deeka
    //动态控制精度长度
    n = (r1 >= r2) ? r1 : r2;
    return ((arg2 * m - arg1 * m) / m).toFixed(n);
}

///给number类增加一个sub方法,调用起来更加方便
Number.prototype.sub = function (arg) {
    return accSub(arg, this);
};

其实还有一个地方没弄明白,就是二进制的加减乘除是怎么运算的,貌似在数字电路里面学过,但是全忘了~明天再看看

参考
http://www.darkof.com/2014/11/23/python-float/
http://www.programering.com/a/MDM3EzNwATE.html