Jan 7

Excel单元格高度调整+10 不指定

felix021 @ 2019-1-7 00:22 [IT » 软件] 评论(0) , 引用(0) , 阅读(1966) | Via 本站原创
使用Excel的过程中经常需要调整行的高度,由于各行的高度不同,统一设定高度往往不适用,而手动逐行调整比较麻烦。有一个常见的小技巧是先按Ctrl+A全选,然后再双击左侧数字标题栏的任意分割线,Excel会自动调整行高。

但是对于精神处女座的我来说,行与行之间没有间隔,所有字密密麻麻挤在一起有点受不了;但是excel又不像css里面可以一句话统一给单元格设置padding或margin(就没有这个属性)。

所幸还有很多其他精神处女座的同学,他们给出的方案 是用宏:

Sub AutoFitPlus()
  Dim rng As Range
  rowCount = 0
  Selection.EntireRow.AutoFit
  For Each rng In Selection.Rows
    rng.RowHeight = rng.RowHeight + 10
    rowCount = rowCount + 1
    If rowCount > 100 Then Break
  Next rng
End Sub


将这段代码保存为一个宏(可以设置一个快捷键,例如 Ctrl + Shift + L),选中某些行,再执行这个宏,就解决问题了。

Nov 23

浏览器客户端证书 不指定

felix021 @ 2018-11-23 23:06 [IT » 其他] 评论(0) , 引用(0) , 阅读(1739) | Via 本站原创
2015年,从某传统金融国企跳槽来到我司的时候,发现后台管理系统竟然需要安装客户端证书才能登陆,简直惊为天人,通过利用 https 的客户端认证,配合证书中嵌入的用户名做权限控制,把内部系统的入侵难度至少增加了一个量级(当然,安装证书的过程对于非技术线的同学说也麻烦了不少)。

后来发现,原来是把 github.com/OpenVPN/easy-rsa 这个项目包装了一下实现的,其实也并不是很困难。

今年年初因为新项目也需要这个方案,自己心血来潮,参考网上的一些说明,用 openssl 的 genrsa、req、x509、pkcs12 这几个命令试着自己颁发客户端证书,并且包装了一套脚本,勉强能用。

但当时没有太多时间,吊销的功能并没有做,因为比颁发证书麻烦多了,不只是敲几个命令,还需要一套更复杂的方案,包括维护一个证书信息列表、按一定规范的文件目录结构,以及DIY的 openssh 配置文件等。

最近抽了两个晚上把整个流程重新梳理了一遍,填了几个坑,终于做了一套完整的脚本出来,这才好意思写这篇博客介绍一下。

这套脚本可以在这里获取:

  https://github.com/felix021/openssl-selfsign

使用起来可以说是非常简单了:

1. 创建CA

  $ ./1-sign-site.sh dev.com

会创建 ca 证书,并在 cert/site/dev.com/ 下面创建 *.dev.com 的 https 证书,并且生成一个 nginx.conf 配置文件供参考(直接可以用的)。

2. 颁发客户端证书

  $ ./2-sign-user.sh test1

在 cert/newcerts/test1-01/ 下面创建 test1 用户的一个客户端证书 cert.p12 ,并给出对应的密码,双击按提示导入即可。

3. 参考第一步生成的 nginx.conf 配置文件,配置好 web 服务器,就行了。

4. 稳妥起见,应当在代码中读取 http 头里的 SSL_DN 参数,从中获取邮箱或者用户名来作为系统的用户名。

至于吊销的过程,要更复杂一些,可以参考该项目的 README 。
Nov 6
改 vimrc 没什么卵用,搜了一下,说是因为终端的兼容问题,只要在 ~/.bashrc 里面加上  "export TERM=linux" 就好。

refer: https://stackoverflow.com/questions/31783160/why-vim-is-changing-first-letter-to-g-after-opening-a-file
Sep 19
# 1. 什么是跳表

跳表(Skip List)是基于链表 + 随机化实现的一个有序数据结构,可以达到平均 O(logN) 的查找、插入、删除效率,在实际运行中的效率往往超过 AVL 等平衡二叉树,而且其实现相对更简单、内存消耗更低。

Redis 的 ZSET 底层实现就是用的 Skip List,这里是 [Antirez对此的说明](https://news.ycombinator.com/item?id=1171423)

这是一个典型的跳表:

[0] -> 0 -> 1 -> 3 -> 4 -> 5 -> 6 -> 7 -> 9 -> nil
[1] -> 0 ------> 3 ------> 5 ------> 7 ------> nil
[2]----------------------> 5-----------------> nil


解释一下:

1. SkipList 是一个多层的链表

2. 第[0]层的链表包含所有节点,其他层的链表包含部分节点,层次越高,节点越少

3. 每层链表之间会共享相同的节点(节省内存,但为了方便展示,每一层都输出了它的值)

4. 对于某个节点,在插入时通过概率判断它最高会出现在哪一层,并且也会出现在之下的每一层

通过这样的设计,当需要查找某个 key 时,可以从最高层的链表开始往前找,在这一层遇到末尾或者大于 key 的节点时往下走一个层,直到找到 key 节点。

例如:

引用
4 的查找路径为 [2] -> [1] -> 0 -> 3 -> 3@[0] -> 4
6 的查找路径为 [2] -> 5 -> 5@[1] -> 5@[0] -> 6
8 的查找路径为 [2] -> 5 -> 5@[1] -> 7 -> 7@[0] -> 9 (找不到)


# 2. 跳表的节点

从上面的描述,我们大概可以知道 (1) 每个节点需要保存一个 key; (2) 每个节点需要有多个next指针 (3) 其 next 指针的数量会在插入时确定

因此我们可以用下面这个 class 来表示节点:

class Node(object)
    def __init__(self, height, key):
        self.key = key
        self.next = [None] * height

    def height(self):
        return len(self.next)


# 3. 创建跳表

一个新创建的跳表是没有节点的。但为了实现的简单起见,可以添加一个头节点:

class SkipList(object):
    def __init__(self):
        self.head = Node(0, None) #头节点高度为0,不需要key


到目前为止都特别简单,但是还什么也干不了。

# 4. 创建节点

创建节点时,需要先按一定的概率分布确定其高度。

为了保证高层的节点比低层少,我们可以用这样的概率分布:

引用
Height(n) = p^n


实现其实非常简单:

import random

def randomHeight(self, p = 0.5):
    height = 1
    while random.uniform(0, 1) < p and self.head.height() >= height:
        height += 1
    return height


这样可以保证平均的路径长度是 log(n) 。

精确一点的话,实际上是 log(n-1, 1/p) / p,也就是说, p 的选择会影响跳表层数、平均路径长度。

具体的计算比较复杂,有兴趣可以参考跳表的原论文《Skip Lists: A Probabilistic Alternative to Balanced Trees》。(TL;DR)

然后我们就可以这样来创建一个新的节点:

    node = Node(self.randomHeight(), key)

# 5. 添加节点

如果只是为空跳表添加一个新的节点,只要更新头结点的每一个next指针:

def insertFirstNode(self, key):
    node = Node(self.randomHeight(), key)
    while node.height > self.head.height():
        self.head.next.append(None) #保证头节点的next数组覆盖所有层次的链表

    for level in range(node.height()):
        node.next[level] = self.head.next[level]
        self.head.next[level] = node


但很显然这个方法只能用一次。

如果跳表中已经有多个节点,那我们就必须找到每一层中适合插入的位置:

def getUpdateList(self, key):
    update = [None] * self.head.height()
    for level in range(len(update)):
        x = self.head
        while x.next[level] is not None and x.next[level].key < key:
            x = x.next[level]
        update[level] = x
    return update


这个函数返回一个 update 节点数组,其中的每个节点都是在这一层中小于 key 的最后一个节点。

也就是说,在 level = i 层,总是可以把新的节点插入 update[i] 之后:

def insert(self, key):
    node = Node(self.randomHeight(), key)
    while node.height > self.head.height():
        self.head.next.append(None) #保证头节点的next数组覆盖所有层次的链表

    update = self.getUpdateList(key)
    next0 = update[0].next[0]
    if next0 is not None and next0.key == key:
        return # 0层总是包含所有元素;如果 update[0] 的下一个节点与key相等,则无需插入。

    for level in range(node.height()):
        node.next[level] = update[level].next[level]
        update[level].next[level] = node


但是由于这一版 getUpdateList 是 O(n) 的,插入效率并没有达到跳表的设计目标。

# 6. 添加节点++

考虑这一点:跳表的每一层都是有序的。

也就是说,我们在找到 update[n] = x 以后,其实可以从节点 x 的 n - 1 层继续查找 update[n-1] 应该是哪个节点。

由于查找路径的平均长度是 log(N) ,所以我们可以实现一个更快的 getUpdateList 方法

注意,需要从最高层开始查

def getUpdateList(self, key):
    update = [None] * self.head.height()
    x = self.head
    for level in reversed(range(len(update))):
        while x.next[level] is not None and x.next[level].key < key:
            x = x.next[level]
        update[level] = x
    return update


# 7. 里程碑1

把上面的代码整合起来,我们就可以得到第一版跳表代码:能够插入节点。

为了更好地展示我们的成果,我们可以用这样一个函数,把链表按第1节的例子样式输出:

def dump(self):
    for i in range(self.head.height()):
        sys.stdout.write('[H]')
        x = self.head.next[0]
        y = self.head.next[i]
        while x is not None:
            s = ' -> %s' % x.key
            if x is y:
                y = y.next[i]
            else:
                s = '-' * len(s)
            x = x.next[0]
            sys.stdout.write(s)
        print ' -> <nil>'
    print


试试看:

sl = SkipList()
for i in range(10):
    sl.insert(sl)
    s1.dump()



[H] -> 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> <nil>
[H]----- -> 1 -> 2 -> 3---------- -> 6 -> 7---------- -> <nil>
[H]---------- -> 2-------------------- -> 7---------- -> <nil>


多尝试几次,以及选择不同的 p 值,可以观察生成跳表的区别。

# 8. 查找节点

实际上查找节点的过程,已经包含在 insert 的实现里了:

def find(self, key):
    update = self.getUpdateList(key)
    if len(update) == 0:
        return None

    next0 = update[0].next[0]
    if next0 is not None and next0.key == key:
        return next0 # 0层总是包含所有元素;如果 update[0] 的下一个节点与key相等,则无需插入。
    else:
        return None


# 9. 删除节点

既然已经能找出 update 节点数组,在 level = i 层,只要判断 update[i].next[i] 是否等于要删除的 key 就可以了:

def remove(self, key):
    update = self.getUpdateList(key)
    for i, node in enumerate(update):
        if node.next[i] is not None and node.next[i].key == key:
            node.next[i] = node.next[i].next[i]


# 10. 里程碑2

整合 find 和 update 数组,就可以实现跳表的基础操作了,试试看:

node = sl.find(3)
print node

for i in range(7, 14):
    sl.remove(i)
    sl.dump()


# 11. 其他

我们在 Node 中只添加了一个 key 属性,在具体的实现中,我们往往可能需要针对 key 存储一个 value,例如 Python 自带的 dict 实现。改造起来也很简单:

1. node 中添加一个 value 属性,并且添加相应的初始化逻辑(__init__方法)

2. 将 SkipList.insert 修改为 `insert(self, key, value)`,在新建 Node 时指定其 value

3. 再添加一个 `update(self, key, value)` API,方便调用方的使用

4. 可以考虑针对语言适配,例如实现 python 的 __getitem__ 、 __setitem__ 等魔术方法


# 12. 完整代码

#coding:utf-8

import random

class Node(object):
    def __init__(self, height, key=None):
        self.key = key
        self.next = [None] * height

    def height(self):
        return len(self.next)

class SkipList(object):
    def __init__(self):
        self.head = Node(0, None) #头节点高度为0,不需要key

    def randomHeight(self, p = 0.5):
        height = 1
        while random.uniform(0, 1) < p and self.head.height() >= height:
            height += 1
        return height

    def insert(self, key):
        node = Node(self.randomHeight(), key)
        print node.height(), node.key
        while node.height() > self.head.height():
            self.head.next.append(None) #保证头节点的next数组覆盖所有层次的链表

        update = self.getUpdateList(key)
        if update[0].next[0] is not None and update[0].next[0].key == key:
            return # 0层总是包含所有元素;如果 update[0] 的下一个节点与key相等,则无需插入。

        for level in range(node.height()):
            node.next[level] = update[level].next[level]
            update[level].next[level] = node


    def getUpdateList(self, key):
        update = [None] * self.head.height()
        x = self.head
        for level in reversed(range(len(update))):
            while x.next[level] is not None and x.next[level].key < key:
                x = x.next[level]
            update[level] = x
        return update

    def dump(self):
        for i in range(self.head.height()):
            sys.stdout.write('[H]')
            x = self.head.next[0]
            y = self.head.next[i]
            while x is not None:
                s = ' -> %s' % x.key
                if x is y:
                    y = y.next[i]
                else:
                    s = '-' * len(s)
                x = x.next[0]
                sys.stdout.write(s)
            print ' -> <nil>'
        print

    def find(self, key):
        update = self.getUpdateList(key)
        if len(update) == 0:
            return None

        next0 = update[0].next[0]
        if next0 is not None and next0.key == key:
            return next0 # 0层总是包含所有元素;如果 update[0] 的下一个节点与key相等,则无需插入。
        else:
            return None

    def remove(self, key):
        update = self.getUpdateList(key)
        for i, node in enumerate(update):
            if node.next[i] is not None and node.next[i].key == key:
                node.next[i] = node.next[i].next[i]


完。
Sep 6
excel很强大,但也有非常蠢的地方:比如今天遇到的,导出文档的日期列是“文本”格式,这时候用数据透视表,excel不能识别这是日期,于是无法根据月或者年对数据进行聚合。

即使选中整列,然后将格式全都修改为日期也不行。

即使再弄一列格式为日期的,然后用黏贴数值也不行。

按照过去的经验,只有逐个格子双击,然后回车,才能把格式应用到数据上,真是蠢到爆炸。

今天觉得实在不能忍了,放狗搜了下“excel apply format instead of double click on each column”,总算找到一个解决方案:

1. 选中该列

2. 在“数据”Tab里点击“分列”(按格式将单列文本拆分成多列,英文版是 Text To Columns)

3. 点击完成

搞定
Aug 3

方法论 不指定

felix021 @ 2018-8-3 03:49 [随想] 评论(2) , 引用(0) , 阅读(3000) | Via 本站原创
1. 意志力

    有些人似乎天生就有很强的意志力,可以长时间连续做某件事情而不知疲倦,例如学霸可以长时间做题,计算机大神可以长时间编写代码(看看那些 Hackathon 比赛)。但我认为这里有一些事实被掩盖了——你回顾一下,是不是也有一些自己可以长时间做而不会疲倦的事情,例如王者荣耀、吃鸡,又或者刷各种美剧?你也许会觉得,这不一样啊,玩游戏和刷剧是这种有意思的事情,并不需要意志力的支撑。问题就在这里——可能很多人无法理解的是,学霸们和计算机大神们可以从做题、写码中获得快乐,所以才会觉得是强大的意志力在支撑着他们。

    在能得到快乐的事情上,持续的投入不需要强大意志力的支撑。反过来说,在不是特别感兴趣甚至令人反感的事情上,意志力就非常重要。不巧的是,意志力是一种有限的资源,合理、有效地利用这个资源,我认为是取得成绩的一个关键因素。

    那该怎么办呢?鸡汤地说,就是想办法让自己喜欢上要做的事情,如果做不到,至少也应该减少排斥感。当然,给汤不给勺是耍流氓,所以请继续往下看。

2. 反馈(奖励)

    很多人都对嗑瓜子这种带有魔性的娱乐活动深有体会,拿着一袋瓜子,不嗑完根本停不下来。如果不巧你不喜欢嗑瓜子,可以想想那些让你觉得非常爽的游戏,无论具体的游戏形式如何呈现,都会有一个共性,就是总会给你立即的回馈。游戏总会给完成挑战的玩家一些奖励,激励他们继续玩下去。好玩的游戏尤其在意回馈的呈现形式,比如施放一个技能产生的光影声音特效、对手被攻击掉血的数值化呈现,杀死一个对手后增加经验值、掉下来的装备,完成一个剧情任务以后的礼包奖励等等,都希望够给玩家创造充分的满足感,引导玩家继续完成下一个目标。

    人类本质上还是一种动物,大脑中对反馈的本能反应,相当于给游戏的设计者开了一个后门,用于操纵玩家的行为。利用好这个特点,即使在本来并不十分感兴趣的领域,也能够显著改善对意志力的消耗。举例而言,我总是会边写边运行我的代码,一方面可以确认写好的代码没有错误(避免把一大堆问题积攒到最后),另一方面运行结果本身是一个有效的反馈(每次运行结果都更完整、更接近目标),激励我继续改进;而且在学习数据结构时(例如一个二叉树),我总会写一个 dump 方法,用来把内存中抽象的数据可视化。

3. 目标

    游戏设计的“心流(flow)”理论给出的几个关键因素中,我认为反馈是最重要的,其次是目标。好的游戏总会设定合理的目标,引导玩家一步一步入坑。太简单的目标会让人觉得无聊,太难的目标则很容易让人因畏难而放弃。就像下棋/打球,和新手、或水平远高于自己的选手对战,都不是一件愉快的事情,而让人既觉得愉快又能提高自己水平的,正是与水平相当(或者略高)的人对战,获胜的可能性(不确定性)和对获胜的追求,会带来双方激烈的对抗,而这些对抗能给参与者淋漓酣畅的快意(反馈),从而形成有效的正反馈循环。

    同样地,在学习过程中,给自己设定合理的目标能够有效降低对意志力的消耗。如何设置合理的目标,可能没有通用的方法,但幸运的是,可以在实践中调整,找到最适合自己的阈值。

4. 启动成本

    俗话说万事开头难,如果能想办法减少开头的困难,那万事就不那么难了。所谓的困难,换种说法就是成本高。这里的成本是广义的,不仅仅包含金钱成本,也包含时间成本,以及其他为了达成目标所要付出的代价。而这里的目标还只是开始做某件事情,如果启动成本过高,在开始之前就会消耗大量意志力,那么结果便可想而知。

    一个典型的例子是健身。我曾经的一份工作,公司隔壁是一个专业大型健身房,提供浴巾、拖鞋等各种洗浴用品,公司发放免费的年卡,于是我每周都会有2~3次午休或下班时间“说走就走”地去健身,甚至还请了教练来辅导我,过程虽然有些辛苦,但回想起来确实很享受那段时光。换工作以后,我也曾经办过一次季度健身卡,但是启动成本陡增:我需要自掏腰包来办理健身卡,而且这家健身房离我有十几分钟脚程、不提供浴巾和拖鞋等,每次去健身房之前心里都一番挣扎,结果是,虽然我坚持了一周2次的频率,但季度卡到期后就没再续费。我做了另一个决定:在家里腾出一小块地方,买了一架椭圆机,只要有15分钟空隙就可以完成一次轻松的健身。如果我愿意,还可以开启刷剧模式,很愉快地在椭圆机上磨蹭大半个小时。

    降低启动成本,其本质就是想办法制定一个较容易达成的初始目标(启动)。硅谷创业家 Eric Rise 所著《精益创业》(Lean Startup)的核心思想“最小化可行产品”(Minimum Viable Product, MVP),我认为也是最小化启动成本的一个应用,通过最小代价开发出一个可用的产品(达成目标:启动),推向市场,验证其可行性(反馈),根据市场反馈持续不断地迭代改进(阶段性目标),形成正反馈循环。

通过降低启动成本,设置适合自己的目标,建立能够带来奖赏的反馈机制,形成正反馈的循环,从而最大化地利用好意志力资源,这就是我的方法论。

如果不够的话,还有一个开挂的技巧——研究表明,吃糖可以在一定程度上补充意志力。也对,要是在健身减肥的过程中可以吃甜品,应该就没那么难坚持下来了吧?
Jun 8
注:这篇是3月初在公司内部平台上发布的,搬一份到 blog 存档。
===


我注意到过去几个月有些同学还在踩一个简单的分布式事务Case的坑,而这个坑我们在两年以前就已经有同学踩过了,这里简单解析一下这个case和合适的处理方案,供各位参考。

# 1. 踩过的坑

这个case有很多变种,先说说我们在 X 业务踩过的坑开始,大约是16年9月,核心业务需求是很简单的:在用户发起支付请求的时候,从用户的银行卡扣一笔钱。负责这个需求的同学是这么写的代码(去除其他业务逻辑的简化版):
$dbTrans = $db->beginTransaction();
try {
    $order = PayRequest::model()->newPayRequest(...); #在数据库中插入一条支付请求记录,状态为待支付
    //其他业务改动
    $result = PaySvr::pay($order->id, $order->amount); #请求PaySvr(或第三方支付通道)扣款
    if ($result['code'] == PaySvr::E_SUCCESS) {
        $order->setAsSucceeded();
    } else {
        $order->setAsPending();
    }
    $dbTrans->commit();
} catch (Exception $e) {
    $dbTrans->rollback();
}

乍一看好像是没有什么毛病,测试的case都顺利通过,也没有人去仔细review这一小段代码,于是就这么上线了。但问题很快就暴露出来,PaySvr在支付成功以后尝试回调,业务系统报错”订单不存在”。查询线上日志发现,这笔订单在请求第三方支付通道时网络超时,Curl抛了timeout异常,导致支付记录被回滚。有心的同学可以自己复现一下这个问题,观察BUG的发生过程。

代码修复起来倒是很简单,在请求PaySvr之前提交事务,将支付请求安全落库即可。
$dbTrans = $db->beginTransaction();
try {
    $order = PayRequest::model()->newPayRequest(...);
    $order->setAsPending();
    //其他业务改动
    $dbTrans->commit(); #先将支付请求落地
} catch (Exception $e) {
    $dbTrans->rollback();
}

#再请求PaySvr
$result = PaySvr::pay($order->id, $order->amount);

#根据PaySvr结果修改支付请求和其他业务记录的状态
$dbTrans = $db->beginTransaction();
try {
    if ($result['code'] == PaySvr::E_SUCCESS) {
        $order->setAsSucceeded();
        //其他业务改动
    } elseif ($result['code'] == PaySvr::E_FAIL) {
        $order->setAsFailed();
        //其他业务改动
    } else {
        //等待后续cron补单
    }
    $dbTrans->commit();
} catch (Exception $e) {
    $dbTrans->rollback();
}

把这个实现代入多个不同的业务下,还会衍生出更多问题,比如被动代扣业务,就可能因为重试导致用户被多次扣款,引起投诉(支付通道对投诉率的要求非常严格,甚至可能导致通道被关停);更严重的是放款业务,可能出现重复放款,给公司造成直接损失。据说某友商就是因为重复放款倒闭的,所以请各位同学在实现类似业务时特别注意,考虑周全。

# 2. 归纳总结

我们往后退一步再审视这个case,这段简单的代码涉及了两个系统:X 业务系统(本地数据库)、PaySvr(外部系统)。可以看得出这段代码的本意,是期望将当前系统的业务和外部系统的业务,合并到一个事务里面,要么一起成功提交,要么一起失败回滚,从而保持两个系统的一致性。

之所以没能达到预期,直接原因是,在失败(异常)回滚的时候,只回滚了本地事务,而没有回滚远端系统的状态变化。按这个思路去考虑,似乎只要加一个 PaySvr::rollbackRequest($order->id) 好像就可以解决问题。但仔细想想就会发现远没这么简单,从业务上来说,如果已经给用户付款了,那实际上就是要给用户退款,而往往这时候是掉单(支付请求结果未知),我们又无法直接给用户退款;更极端一点,如果这个rollback请求也超时了呢,那本地可以rollback吗?

这就是分布式事务棘手的地方了,只靠这样的逻辑是无法保证跨系统的一致性的。解决这个问题的方法是引入两段式提交(2 Phase Commit,常常简写为2PC),其基本逻辑是,在两个系统分别完成业务,然后再分别提交。

例如我们上面的解决方案,实际上就是2PC的一个实现:我们把业务需求作为一整个事务,它可以拆成两个子事务(第三方支付通道完成代扣,在业务系统记录支付请求成功并修改相应业务状态),每个子事务又分成两个阶段:第一阶段,是在本地先记录支付请求(状态为待确认),并向第三方支付发出代扣请求(结果不确定);第二阶段,在确认第三方代扣成功以后,修改本地支付请求的状态修改为成功,或者是代扣结果为失败,本地支付请求状态记为失败。两个阶段都完成,这个事务才是真的完成了。

# 3. Case变种

仔细思考我们曾经实现过的需求,可能会在很多看似不起眼的地方发现分布式事务,例如我们在存管匹配系统里面,就有这样一个Case。

由于与银行存管系统交互的延迟比较大,所以我们的匹配系统实现是异步的,匹配系统在撮合了资金和资产以后,会生成一条债权关系记录在本地,随后再发送到银行系统执行资金的划拨。为了提高执行的效率,我们希望在债权关系生成以后,尽快执行资金的划拨,因此我们会把资金划拨的指令通过LPush放进Redis的list里;List的另一端,那些使用BLPOP监听数据的worker会立刻被激活去执行。

如果没有仔细思考,代码可能会这么写:
#匹配系统
function matcher() {
    $dbTrans = $db->beginTransaction();
    try {
        foreach (matchCapitalAndProject() as $match_result) {
            list($capital_id, $project_id, $amount) = $match_result;
            $relation = Relation::model()->create($capital_id, $project_id, $amount);
            $redis->lPush($relation->id);
        }
        $dbTrans->commit();
    } catch (Exception $e) {
        $dbTrans->rollback();
    }
}

#Worker
function Worker() {
    while (true) {
        $id = $redis->brPop();
        $relation = Relation::model()->findByPk($id);
        if ($relation) {
            HengfengApi::invest($relation->capital_id, $relation->project_id, $amount);
        }
    }
}

在实际执行这段代码的时候,如果没有仔细测试(尤其是在有补单逻辑,捞出未执行成功的划拨指令再发送给银行),可能就不会发现,实际上有很多指令并不是马上被执行的,因为relation_id被送进list以后,worker马上就会读出来执行,但这时事务可能还没有提交。但这只是影响了业务的效率,还没有对业务的正确性产生影响。


为了修复这个问题,似乎可以这么做:把 [capital_id, project_id, amount] 发送到redis,worker直接取出执行,这样就不用从数据库读取relation,保证尽快将请求发送到银行。但如果因为某些原因,事务最终没有被提交呢?找银行rollback这些指令的执行,那就麻烦多了。

正确的做法是,在事务提交了以后,再lPush到Redis里:
#匹配系统
function matcher() {
    $arr_relation = [];
    $dbTrans = $db->beginTransaction();
    try {
        foreach (matchCapitalAndProject() as $match_result) {
            list($capital_id, $project_id, $amount) = $match_result;
            $relation = Relation::model()->create($capital_id, $project_id, $amount);
            $arr_relation[] = $relation;
        }
        $dbTrans->commit();
    } catch (Exception $e) {
        $dbTrans->rollback();
        $arr_relation = []; #清空,避免被push到队列里
    }
    foreach ($arr_relation as $relation) {
        $redis->lPush($relation->id);
    }
}
注:foreach要放到try-catch后面。


最后想补充一点,相信有很多同学知道这个Case,或者就算不知道也不会犯这样的错误,因此也许会觉得没必要专门揪出来这样分享 —— 但“知识的诅咒”就是这样,“我会的东西都是简单的”,然而对于没有踩过坑的同学来说,其实都是宝贵的经验;另一方面,有些别人觉得简单的问题、踩过的坑,也许自己是不知道的。所以,希望大家都能分享自己在工作学习中踩过的坑、解决过的问题,互相交流,互相提高。
Mar 29

浅析嵌套数据库事务 不指定

felix021 @ 2018-3-29 22:00 [IT » 数据库] 评论(0) , 引用(0) , 阅读(9371) | Via 本站原创

大家都知道,数据库事务提供的强一致性,让我们只需要在业务开始之前执行begin、结束后执行commit,并在异常的情况下执行rollback,就能够保证业务数据的强一致性。

## 1. 转一笔账

以一个转账操作为例,从from账户往to账户转一笔钱,涉及到两个账户的操作,我们用事务来保证数据的一致性:
function actionTransfer(Account $from, Account $to, int $amount)
{
    $this->db->begin();
    try {
        $from->withdraw($amount);
        $to->deposit($amount);
        $this->db->commit();
    } catch (Exception $e){
        $this->db->rollback();
    }   
}

class Account extends Model
{
    protected function changeBalance($amount)
    {
        $this->db->exec(
            "update account set balance = balance + :amount where id = :id",
            ['amount' => $amount, 'id' => $this->id]
        );
        AccountLog::create($this->id, $amount); #创建一条账户流水
    }
    public function withdraw($amount)
    {
        $this->changeBalance(-$amount);
    }
   
    public function deposit($amount)
    {
        $this->changeBalance($amount);
    }
}

如上面的栗子所见,我们往往会把数据库操作封装到底层模块(常常是model)里,对上层提供API,这样既保证了上层业务的可读性,也使得代码的可复用性提高了。

## 2. 充一笔钱,踩一个坑

说到复用性,那就是说,可能会有另一个业务需要调用它,比如充值,这个业务很简单,只涉及到一个账户,似乎可以简单点:
function actionDeposit(Account $account, int $amount)
{
    $account->deposit($amount);
}

这样看起来好像也没啥问题,对吧?但细心的小伙伴可能注意到了,代码里的 `changeBalance` 函数挖了个坑:在修改完账户余额以后会创建一条流水记录。如果没有开启事务的话,极端情况下会出现账户余额改好了,却没记录流水的情况,这就尴尬了。

当然这里只要在 `actionDeposit` 里开启了事务就能解决这个问题,但是基于防御性编程的习惯,`changeBalance` 这个方法本身作为一个独立的业务,应当尽最大努力确保其内业务的一致性,更直白一点说,就是也开启一个事务:
    protected function changeBalance($amount)
    {
        $this->db->begin();
        try {
            $this->db->exec("update account set balance ...", ...);
            AccountLog::create($this->id, $amount);
            $this->db->commit();
        } catch (Exception $e) {
            $this->db->rollback()
            throw $e; //确保上层业务知晓发生了异常
        }
    }

我们期望每一段代码都像这样对自己的业务负责,那么程序员不管在哪一层写代码,都能写得放心。

## 3. 嵌套事务

但到目前为止还没有到到期望的这么美好:如果只是在同一个流程里连续使用  begin ,实际上只会启动一个事务,遇到第一个 commit 就会结束事务,后续如果出现异常,rollback 无法达到预期的效果:
引用

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> update account set balance = balance + 1 where id = 1;
Query OK, 1 row affected (0.01 sec)

mysql> begin;
Query OK, 0 rows affected (0.01 sec)

mysql> update account set balance = balance - 1 where id = 2;
Query OK, 1 row affected (0.01 sec)

mysql> commit;
Query OK, 0 rows affected (0.00 sec)

mysql> rollback;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from account;
+----+---------+
| id | balance |
+----+---------+
|  1 |    1001 |
|  2 |    999 |
+----+---------+

还好的是,MySQL提供了嵌套事务的支持,可以使用其 SavePoint 特性来达到这个效果:
引用

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> update account set balance = balance + 1 where id = 1;
Query OK, 1 row affected (0.00 sec)

mysql> savepoint level1;
Query OK, 0 rows affected (0.00 sec)

mysql> update account set balance = balance - 1 where id = 2;
Query OK, 1 row affected (0.00 sec)

mysql> release savepoint level1;
Query OK, 0 rows affected (0.00 sec)

mysql> rollback;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from account;
+----+---------+
| id | balance |
+----+---------+
|  1 |    1000 |
|  2 |    1000 |
+----+---------+

利用这一点,我们就可以写出一个支持嵌套事务的数据库模块:
class Database
{
    protected $level = 0;
    public function begin()
    {
        if ($this->level == 0) {
            $this->exec("begin");
        } else {
            $this->exec("savepoint level{$this->level}");
        }
        $this->level += 1;
    }

    public function commit()
    {
        $this->level -= 1;
        if ($this->level == 0) {
            $this->exec("commit");
        } else {
            $this->exec("release savepoint level{$this->level}");
        }
    }

    public function rollback()
    {
        $this->level -= 1;
        if ($this->level == 0) {
            $this->exec("rollback");
        } else {
            $this->exec("rollback to savepoint level{$this->level}");
        }
    }
}

当然,对于不支持 savepoint 特性的DBMS,也不是不能解决这个问题,只要在 `level != 0` 的时候不执行 commit 或者 rollback 就行,只是无法达到 savepoint 的效果。

加上这个改动的具体实现这里就不贴了,感兴趣的小伙伴建议自己动手写写看,实在太懒也可以查看 Yii2.0 中`yii\db\Transaction` 的实现 —— 其实上面这段代码就是对 Yii2.0 实现的一个简化版本。喜欢 Laravel 的小伙伴,在 `Illuminate\Database\Concerns\ManagesTransactions` 也能找到类似的应用。

## 4. 找坑和填坑

我们实现的这个嵌套事务方案,使得每一部分代码都能够健壮地实现其业务。但是,在具体编码过程中会常常还会遇到两个大坑:坑一是 begin 和 commit/rollback 不配对,坑二是出了错忘记反馈给上层。在上面的实现里,要避开这两个坑,完全依赖程序员的编码习惯。但墨菲定律告诉我们……说多了都是泪,总之就是“依赖人来保证正确性”这件事情永远是不靠谱的。

那怎么改变这一点呢?Laravel 的解决方案是由代码本身来填坑,所以实现了 `ManagesTransactions->transaction` 这个方法(以下略作简化,省略了失败重试机制及相应的死锁处理,这一部分建议详读 laravel 的相关代码):
    public function transaction(Closure $callback)
    {
        $this->begin();
        try {
            $value = $callback();
            $this->commit();
            return $value;
        } catch (Exception $e) {
            $this->rollback();
            throw $e;
        }
    }

于是程序员们就可以这么干了:
public function actionTransfer($from, $to, $amount)
{
    DB::transaction(function () use ($from, $to, $amount) {
        $from->withdraw($amount);
        $to->deposit($amount);   
    })
}

当然,这个方案也不是完全没代价的,其中最重要的是,程序员们要严格遵守一个基本约定:**遇到错误的时候抛出异常**,以便封装层自动执行rollback。虽然也是约定,但是不像前面那两个约定那么晦涩,应该更容易遵守吧。

另外,如果在rollback之前还希望做一些清理操作(包括打log之类的),需要在callback里面再包一层 try-catch ;但如果遇到 commit 失败这种极端情况,清理操作仍无法被执行。这可以通过一个更复杂的约定(比如返回一个包含清理操作的 $clean_callback),但鉴于实际上不太有这种需要,得不偿失,所以大概就这样吧。

这种方案,在 C++ 里面有很多类似的例子,甚至还有一个专有名词 RAII (Resource Acquisition Is Initialization),“资源获取即初始化”,念起来有点拗口,其逻辑是在获取一个资源的时候立即初始化,并且在资源生命周期结束的时候自动销毁,避免因为程序员遗忘导致非预期的异常。常见的应用场景,包括打开文件(自动关闭)、多线程并发获取互斥锁(自动解锁)、申请内存资源(自动释放)等等。

如果你觉得看明白了,不妨试着写一个文件锁(flock)的代码练练手?
分页: 7/102 第一页 上页 2 3 4 5 6 7 8 9 10 11 下页 最后页 [ 显示模式: 摘要 | 列表 ]