织网

身体和灵魂,总有一个在路上

HTTPServer 优雅关闭

| Comments

和同事聊到了服务在需要关闭的时候该如何优雅退出,顺藤摸瓜挖掘了Go1.8的特性。Go 1.8起新增了优雅退出 HTTPServer 的特性,也就是大家经常提到的 GraceFul ShutDown。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// src/net/http/server.go
// Shutdown gracefully shuts down the server without interrupting any active connections. Shutdown works by first closing all open listeners, then closing all idle connections, and then waiting indefinitely for connections to return to idle and then shut down. If the provided context expires before the shutdown is complete, then the context's error is returned.

func (srv *Server) Shutdown(ctx context.Context) error {
    atomic.AddInt32(&srv.inShutdown, 1)
    defer atomic.AddInt32(&srv.inShutdown, -1)

    srv.mu.Lock()
    lnerr := srv.closeListenersLocked()
    srv.closeDoneChanLocked()
    srv.mu.Unlock()

    ticker := time.NewTicker(shutdownPollInterval)
    defer ticker.Stop()
    for {
        if srv.closeIdleConns() {
            return lnerr
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
        }
    }
}

从文档注释得知,server.Shutdown 首先关闭所有 active 的 listener,以及所有处于 idle 状态的 Connections,然后无限等待那些处于 active 状态的 connection 变为 idle 状态后,关闭他们,Server退出。

如果有一个 Connection 依然处于 active 状态,那么 server 将一直 block 在那里。 Shutdown 接受一个 Context 参数,调用者可以通过 Context 传入一个等待的超时时间。 一旦超时,Shutdown 将直接返回。对于仍然处理 active 状态的Connection,就无能为力了。 所以 Shutdown 超时时间尽量要比链接处理的时间长。

了解完原理,我们用例子来感受一下这个特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"time"
	"github.com/gin-gonic/gin"
)

func main() {
	router := gin.Default()
	router.GET("/", func(c *gin.Context) {
		time.Sleep(3 * time.Second)
		log.Printf(http.StatusOK, "Handle request success")
	})

	srv := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	go func() {
		if err := srv.ListenAndServe(); err != nil {
			log.Printf("listen: %s\n", err)
		}
	}()

	quit := make(chan os.Signal)
	signal.Notify(quit, os.Interrupt)
	<-quit
	log.Println("Shutdown Server ...")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatal("Server Shutdown:", err)
	}
	log.Println("Server exist")
}

代码中,每个请求都等待3秒才完成,使用信号来捕捉程序退出。退出时,HTTPServer 等待5秒来”善后”。我发起curl localhost:8080来测试,随即按下 Ctrl+C 退出程序,结果显示,服务器坚持在处理完这个 HTTP 请求才退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:	export GIN_MODE=release
 - using code:	gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /                         --> main.main.func1 (3 handlers)
Handle Request success
[GIN] 2017/07/12 - 20:30:47 | 200 |  3.000385597s | 127.0.0.1 |   GET     /
^C  //终端输入Ctrl+C
Shutdown Server ...
listen: http: Server closed
Handle Request success //在接收到关闭信号时,依然保证正在处理的请求正常处理完
[GIN] 2017/07/12 - 20:30:53 | 200 |  3.000360362s | 127.0.0.1 |   GET     /
Server exist

MIT 6.824 MapReduce

| Comments

学习 MIT 6.824 Lab1 MapReduce,做下笔记

MapReduce 的思路

  • 把数据分成 M 份,每一份叫做 Mi
  • 启动一个 master 对象,由它来控制如何分配调控
  • master 挑出一个 worker,对 Mi 执行 map 操作,返回一个 KV 数组
  • 然后把 KV 数组分成 nReduce 份存在本地,等待 Reduce 操作。当 map 全部完成后,每个 Mi 产生 nReduce 份结果,每一个叫做 Ri。文件名:mrtmp-JobName-Mi-Ri 其中Mi Ri 分别表示数字,因此这一步会产生 M * nReduce 份文件。
  • 从每个 Mi 中选择一份 Ri。然后根据 Key 排序,把相同 Key 的 Value 合在一起,生成 Key /list(value)
  • 开始 Reduce,输入list(value),最后会生成 R 份文件 mrtmp.JobName-res-Ri
  • 最后 Merge 成一个文件。

作业步骤

  • Part1 完成 doMap 和 doReduce。doMap 完成3,4两个步骤. doReduce 完成5,6两个步骤。
  • Part2 实现 main/wc.go 在Part1的基础上完成函数调用而已。
  • Part3 把 map 和 reduce 的操作变成异步。用到了RPC,用channel 来实现并发控制。

代码笔记

源码

  • common.go 11-32行:可变参数打印日志,这个方法与C语言常用的类似
  • common_reduce.go Line75:sort.strings 对字符串切片排序
  • commo_rpc.go Line59:rpc调用方法
  • master.go Line95-99:当mr.newCond.Broadcast()被调用,此处就被唤醒,否则一直阻塞,mr.wait()所在的逻辑分支才会被唤醒,否则继续阻塞
  • master.go Line15-16:匿名参数,表示 Master 具有sync.Mutex的接口, 因而 Master 也能调用sync.Mutex的函数. 所以当调用 master.Lock()的时候也不足为奇
  • master_rpc.go Line14,Line37:chanel 被close的时候,case <- shutdown 也就被触发了

双端链表

| Comments

Nginx, Redis 项目中,均使用上了双链表。

  • 列表类型的键进行操作(RPUSH, LPOP)时,程序底层操作用的是双端链表。 源码

  • Nginx 典型应用是在连接池中。Nginx 会处理大量的 socket 连接,为提高并发处理链接的能力,引入了连接池,其实现这个连接池用到了双链表。源码.

对于双端链表,教科书上曾有提及,但如今映像并不深刻,再度理解并实践一次。

结构定义与初始化

1
2
3
4
5
6
7
8
9
10
typedef struct Node {
    int num;
    struct Node * next; //前继指针
    struct Node * pre;  //后继指针
} Node;

typedef struct dlist {
    Node * head; //双链表的指向头结点的元素
    Node * tail; //双链表指向尾部节点的元素,方便从后检索
} dlist;

初始化的双链表,head,tail 均为空。

插入操作

插入操作,是按照递增的顺序插入,涉及到双链表的更改,因此传参是指向链表的指针,分三种情况

  • 空链表插入头元素,head 与 tail 节点均要修改
  • 在链表尾部插入节点,要变动 tail 指针
  • 中间插入节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int insertDlist(dlist **list, int num) {
    Node * head = (*list) -> head;
    Node * tail = (*list) -> tail;

    if (list == NULL) return -1;

    Node * node = initNode(num);
    if (node == NULL) return -1;

    // empty dlist
    if ((*list) -> head == NULL && (*list) -> tail == NULL) {
        (*list)-> head = node;
        (*list)-> tail = node;
        return 0;
    }

    while (head -> next && head -> num < num) {
        head = head -> next;
    }

    // at the end
    if (head->next == NULL) {
        head -> next = node;
        node -> pre = head;
        tail -> pre = node;
        return 0;
    } else {
        // in the middle
        node-> next = head -> next;
        head -> next -> pre = node;
        node -> pre = head;
        head -> next = node;
        return 0;
    }
}

删除操作

删除操作,要涉及到双链表的更改,因此传参是指向链表的指针,分三种情况

  • 删除头结点,
  • 删除尾节点
  • 删除中间节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static int deleteDlist(dlist ** list, int location) {
    Node * head = (*list) -> head;
    Node * now = NULL;
    Node * last = NULL;

    if (head == NULL)
        return -1;
    if (location <= 0 || location > numOfDlist(*list))
        return -1;

    if (location == 1) {
        now = head;
        (*list) -> head = now ->next;
        head -> next ->pre = NULL;
        if (now) {
            free(now);
            now = NULL;
        }
        return 0;
    }
    int num = 0;
    while (head && num++ < location) {
        head = head -> next;
    }

    if (head -> next == NULL) {
        now = (*list) -> tail;
        head -> pre -> next = NULL;
        now -> pre = head->pre;
        if (head) {
            free(head);
            head = NULL;
        }
    } else {
        now = head -> next;
        last = head -> pre;
        now ->pre = head -> pre;
        last -> next = head ->next;
        if (head) {
            free(head);
            head = NULL;
        }
    }
    return 0;
}

本文所述是 C 语言实现的,源码 在 Go 语言之中, container/list 包实现了双链表,直接引入就可以使用了。

Zookeeper 笔记

| Comments

ZooKeeper 是一个开源的分布式协调服务,是分布式数据一致性的解决方案。

集群角色

在 ZooKeeper 中,有三种角色: Leader,Follower,Observer

一个 ZooKeeper 集群同时只会有一个Leader,其他都是 Follower 或 Observer。 Leader 服务器为客户端提供读和写服务,Follower 和 Observer 都能提供读服务,不能提供写服务。区别在于,Observer 不参与 Leader 选举过程,也不参与写操作的过半写成功策略,因此 Observer 可以在不影响写性能的情况下提升集群的读性能。

会话

客户端和 ZooKeeper 服务器会与服务器建立一个 TCP 连接,通过这个连接,客户端能够通过心跳检测和服务器保持有效的会话,也能够向 ZooKeeper 服务器发送请求并接受响应,同时还能通过该连接接收来自服务器的 Watch 事件通知。

数据节点

ZooKeeper 中的数据节点是指数据模型中的数据单元,称为 ZNode。ZooKeeper将所有数据存储在内存中,数据模型是一棵树(ZNode Tree),由斜杠进行分割的路径,就是一个ZNode。每个ZNode上都会保存自己的数据内容,同时会保存一系列属性信息。每个ZNode不仅本身可以写数据,还可以有下一级文件或目录。

在ZooKeeper中,ZNode可以分为持久节点和临时节点两类。持久节点是指一旦这个 ZNode 被创建了,除非主动进行 ZNode 的移除操作,否则这个 ZNode 将一直保存在 ZooKeeper上。临时节点的生命周期跟客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。

ZooKeeper 允许用户为每个节点添加一个特殊的属性:SEQUENTIAL。一旦节点被标记上这个属性,那么在这个节点被创建的时候,ZooKeeper就会自动在其节点后面追加上一个整型数字,这个整型数字是一个由父节点维护的自增数字。

版本

ZooKeeper 的每个 ZNode 上都会存储数据,对于每个ZNode,ZooKeeper都会为其维护一个叫作Stat的数据结构,Stat中记录了这个ZNode的三个数据版本,分别是 version(当前ZNode的版本, cversion(当前ZNode子节点的版本)和 aversion(当前ZNode的ACL版本)。

事务

在 ZooKeeper 中,能改变 ZooKeeper 服务器状态的操作称为事务操作。包括数据节点创建与删除、数据内容更新和客户端会话创建与失效等操作。对应每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用ZXID表示,通常是一个64位的数字。每一个ZXID对应一次更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些事务操作请求的全局顺序。

Watcher

ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去。该机制是 ZooKeeper 实现分布式协调服务的重要特性。

ACL

ZooKeeper 采用 Access Control Lists 策略来进行权限控制。ZooKeeper 定义了如下5种权限。

1
2
3
4
5
6
CREATE: 创建子节点的权限。
READ: 获取节点数据和子节点列表的权限。
WRITE:更新节点数据的权限。
DELETE: 删除子节点的权限。
ADMIN: 设置节点ACL的权限。
注意:CREATE 和 DELETE 都是针对子节点的权限控制。

ZAB 原子广播协议

ZooKeeper Atomic Broadcast(ZAB,ZooKeeper原子广播协议)的协议作为其数据一致性的核心算法。

所有事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器被称为Leader服务器,而剩下的其他服务器则成为 Follower 服务器。Leader 服务器负责将一个客户端事务请求转换成一个事务 Proposal(提案)并将该 Proposal分发给集群中所有的 Follower 服务器。之后 Leader 服务器需要等待所有 Follower 服务器的反馈,一旦超过半数的 Follower 服务器进行了正确的反馈后,Leader 就会再次向所有的 Follower 服务器分发 Commit 消息,要求对刚才的 Proposal 进行提交。

应用场景

  • 数据发布与订阅-配置中心

发布者将数据发布到 ZooKeeper 节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和动态更新。全局配置信息就可以发布到 ZooKeeper 上,让客户端(集群的机器)去订阅该消息。

客户端想服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送 Watcher 事件通知,客户端接收到这个消息通知后,需要主动到服务端获取最新的数据(推拉结合)。

  • 命名服务,即生成全局唯一的ID。

  • 分布式协调通知

ZooKeeper 中特有 Watcher 注册与异步通知机制,实现对数据变更的实时处理。不同的客户端都对ZK上同一个ZNode 进行注册,监听 ZNode 的变化(包括ZNode本身内容及子节点的),如果 ZNode 发生了变化,那么所有订阅的客户端都能够接收到相应的 Watcher 通知,并做出相应的处理,是一种通用的分布式系统机器间的通信方式。

  • 心跳检测

基于 ZK 临时节点的特性,可以让不同的进程都在 ZK 的一个指定节点下创建临时子节点,不同的进程直接可以根据这个临时子节点来判断对应的进程是否存活。通过这种方式,检测和被检测系统直接并不需要直接相关联,而是通过 ZK 上的某个节点进行关联,大大减少了系统耦合。

  • 分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁又分为排他锁和共享锁两种。 排他锁又称为写锁或独占锁,共享锁又称为读锁。

把 ZooKeeper 上一个 ZNode 看作是一个锁,获得锁就通过创建ZNode的方式来实现。所有客户端都去/x_lock节点下创建临时子节点/x_lock/lock。ZooKeeper会保证在所有客户端中,最终只有一个客户端能够创建成功,那么就可以认为该客户端获得了锁。同时,所有没有获取到锁的客户端就需要到/x_lock节点上注册一个子节点变更的 Watcher 监听,以便实时监听到 lock 节点的变更情况。

Squid 做正向代理

| Comments

正向代理和反向代理

  • 正向代理

正向代理 是一个位于客户端和原始服务器之间的服务器,为了从原始服务器取得内容,客户端向代理发送一个请求并指定原始服务器,然后代理向原始服务器转交请求并将获得的内容返回给客户端。客户端必须要进行一些特别的设置才能使用正向代理。

  • 反向代理

反向代理正好相反,对于客户端而言它就像是原始服务器,并且客户端不需要进行任何特别的设置。客户端向反向代理内容发送普通请求,接着反向代理将判断向何处(原始服务器)转交请求,并将获得的内容返回给客户端,就像这些内容原本就是它自己的一样。

正向代理软件

Nginx 能做正向代理,遗憾的是它不能做 HTTPS 的正向代理。以下是一个例子。

  • 不能有 hostname。
  • 必须有 resolver, 即 DNS
  • 配置正向代理参数,均是由 Nginx 变量组成
1
2
3
4
5
6
7
8
9
server{
      resolver 8.8.8.8;
      resolver_timeout 30s; 
      listen 80;
      location / {
                proxy_pass http://$http_host$request_uri;
                proxy_set_header Host $http_host;
        }
}

Squid 可正向代理 HTTP 以及 HTTPS

1
sudo apt-get install squid

编辑 /etc/squid3/squid.conf, 并重启

1
2
3
4
5
6
7
http_port 3128                 	#代理服务器的端口
#http_access deny !Safe_ports 	#注释掉此项
#http_access deny manager     	#注释掉此项

#添加下面两项,设置哪些网段可以访问本代理服务器
acl our_networks src 172.16.1.0/24 
http_access allow our_networks
1
sudo service squid3 restart

Squid 的层次代理值得一提, 若我们需要定期地切换代理服务器的话, 启动一个 Squid 代理, 而这个代理会将请求转发到其他代理上面. 然后我们只需定时更新本地 Squid 代理的配置文件, 然后重启这个本地代理即可. 层次代理用到了 cache_peer 这个配置文件

1
2
3
4
cache_peer hostname type http_port icp_port option

e.g.
cache_peer xxx.proxy.com parent 9020 0 no-query default login=xxxxx:yyyy
  • hostname: 指被请求的同级子代理服务器或父代理服务器。可以用主机名或ip地址表示;
  • type:指明 hostname 的类型,是同级子代理服务器还是父代理服务器,也即 parent 还是 sibling;
  • http_port:hostname的监听端口;
  • icp_port:hostname 上的ICP监听端口,对于不支持ICP协议的可指定7;
  • options:可以包含一个或多个关键字。
    1. proxy-only:指明从peer得到的数据在本地不进行缓存,缺省地,squid是要缓存这部分数据的;
    2. weight=n:用于有多个peer的情况,如果多于一个以上的peer拥有你请求的数据时,squid通过计算每个peer ICP 响应时间来 决定其weight的值,然后 squid 向其中拥有最大 weight 的peer发出ICP请求。
    3. no-query:不向该peer发送ICP请求。如果该peer不可用时,可以使用该选项;
    4. Default:有点象路由表中的缺省路由,该peer将被用作最后的尝试手段。当你只有一个父代理服务器并且其不支持ICP协议时,可以使用default和no-query选项让所有请求都发送到该父代理服务器;
  • login=user:password:当你的父代理服务器要求用户认证时可以使用该选项来进行认证

Cgroup 限制计算资源

| Comments

Cgroup 实现了对计算机资源使用上的隔离,它是 Docker 底层的基础技术。我们可以用它来限制程序使用的CPU、内存、磁盘。

安装

在 Ubuntu 14.04 下安装的方法:

1
sudo apt-get install cgroup-bin

安装完后执行 mount -t cgroup 会出现如下,可以看到它其实是一个文件系统

1
2
3
4
cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,relatime,cpuset)
cgroup on /sys/fs/cgroup/cpu type cgroup (rw,relatime,cpu)
...
cgroup on /sys/fs/cgroup/hugetlb type cgroup (rw,relatime,hugetlb)

如果没有看到以上的目录,这时候需要手动 mount 了

1
2
3
4
5
6
7
8
9
cd /sys/fs
mkdir cgroup
mount -t tmpfs cgroup_root ./cgroup
mkdir cgroup/cpuset
mount -t cgroup -ocpuset cpuset ./cgroup/cpuset/
mkdir cgroup/cpu
mount -t cgroup -ocpu cpu ./cgroup/cpu/
mkdir cgroup/memory
mount -t cgroup -omemory memory ./cgroup/memory/

实践

我们来感性认识下 cgroup 吧,编写一个耗费 CPU 的程序,姑且叫暴走程序(baozou)

1
2
3
count = 0
while True:
    count = count + 1 - 1

运行该程序,top -p 之,100% CPU使用率

1
2
  PID      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
16515      20   0    2336    728    544 R  100.0  0.0   1:27.23 baozou

我想限制暴走程序 CPU 使用该如何做? 我们手动创建一个 cgroup 目录来针对它。

1
2
3
4
5
cd /sys/fs/cgroup/cpu
mkdir calm      // 名字可自定义
ls /calm        // 该目录下自动生产与 CPU 有关的文件
cgroup.clone_children  cpu.cfs_period_us  cpu.shares  notify_on_release
cgroup.procs           cpu.cfs_quota_us   cpu.stat    tasks

接着写入限制规则

1
2
3
4
5
// 默认是100000,20000意味着限制它的cpu为20%
echo 20000 > /sys/fs/cgroup/cpu/calm/cpu.cfs_quota_us,

// 写入程序的 PID 16515
echo 16515 > /sys/fs/cgroup/cpu/calm/tasks

于是 CPU 就降到 20% 。

1
2
  PID       PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
16515       20   0    2336    728    544 R  20.0  0.0   1:27.23 baozou

除了这种需要指定 PID 来限制资源的方法,也可通过指定规则来执行,更显得方便,效果和上述一致。

1
sudo cgexec -g cpu:calm ./baozou

可以看看这个限制规则做了什么?

1
2
3
4
5
6
7
8
$ sudo cgget calm
calm:
cpu.shares: 1024
cpu.cfs_quota_us: 20000
cpu.stat: nr_periods 6943
    nr_throttled 6941
    throttled_time 563080015831
cpu.cfs_period_us: 100000

上述的例子中,我们手动创建了 calm, 其实也能通过命令来做到的

1
2
3
4
5
6
7
8
9
10
11
// 创建cgroup 文件目录
sudo cgcreate -g cpu:/calm -g memory:/calm

// 设置限制的参数
sudo cgset -r cpu.shares=200 calm

// 限制了内存
sudo cgset -r memory.limit_in_bytes=64k calm

// 可以删除
sudo cgdelete cpu/calm memory:/calm

参考链接:

记一次使用 Redis 协议诡异的bug

| Comments

记录昨天定位一个诡异 bug 的过程,我耗费了不少精力,你若有兴趣,请带着一点耐心看完它。

问题背景

我们使用 go-redis-server 开发具有 redis 协议的服务。 按照文档,我们实现了如下接口,其背后访问的是 AWS 的 Dynamodb,我们的服务也开发了监控接口,以供我们这些程序狗知道它发生了什么。

1
2
3
func (handler *RedisHandler) Get(key string) (result string, err error)
func (handler *RedisHandler) Set(key string, val string) (err error)
func (handler *RedisHandler) Del(key string) (count int, err error)

这样,我们就能通过 redis 客户端来执行 Get,Set,Del 操作。

我在批量写入几千条数据时,通过监控接口,我看到服务突然卡住的样子,没有 Get,Set的统计信息了。但我能肯定的是客户端一直有数据在往服务写入,或者读写。 同时从 AWS 监控得到的反馈是 Dynamodb 使用超过预设值,我调高了读写预设值,重启服务,就恢复可用(重启大法好)。 程序试运行十多天,只发生过一次异常,之后再无重现。 这个事情没有搞清楚,就成为我一个心结。

重现问题

我们来看看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (handler *RedisHandler)
Set(key string, val string) (err error) {
	...

	m, err := JSONToMap(val)
	_, err = table.PutItem(m)
	if err != nil {
		log.Log.Errorf("PutItem failed, 
		table: %s, primary key: %s, value: %+v, 
		err: %s",
		tableName, primaryVal, m, err.Error())
		return
	}
	return
}

Set接口,简单的将数据写入 Dynamodb,Dynamodb 如果异常就返回错误,然后通过redis协议返回给客户端。

我很大程度确定那时候是因为 AWS Dynamodb 的异常导致这个错误的。除非这个巧合太牛逼了,难道 go-redis-server 接口不支持返回错误不成吗?这个猜想很快就被我们用实验否定了。

我想重新触发 AWS Dynamodb 返回写容量超标的错误,测试了一阵子,并不容易重现。有点沮丧,这个时候我回忆起 aws-go-sdk 的特点,如果给Dynamodb 字段 赋予空值,是会有异常返回的, 类似如下:

1
2
3
ValidationException: One or more parameter values were invalid:
An AttributeValue may not contain an empty string
	status code: 400

空值的测试明显容易制造。我在本地也开启服务,端口是1234,用 pyredis 作为客户端做测试。 测试脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import redis
import json
import random
r = redis.Redis(host='localhost',port=1234,db=0)

table_name = 'test'
primary_key = 'a'

i = 0
while True:
    if i > 2:
        break
    data = {
        'a': str(random.random()),
        'b': ‘’,
    }
    key = '{0}:{1}:{2}'.format(table_name, primary_key, data[primary_key])
    value = json.dumps(data)
    r.set(key, value)
    i = i + 1

发现测试程序卡在终端,一动不动,strace 测试程序

1
2
3
$ sudo strace -p 22404
Process 22404 attached
recvfrom(3,

感觉像是在等待服务器返回,但是等不到回报的样子。

试着解决问题

难道 go-redis-server 这个框架有猫腻,我就开始了一下午的看源码之旅。不得不说源码写的真好。回头看我们出错的代码片段,我试着修改 err 信息,修改成自己定义的错误字符串。

1
2
3
4
5
6
7
8
9
10
11
func (handler *RedisHandler) Set(key string, val string) (err error) {
	...

	m, err := JSONToMap(val)
	_, err = table.PutItem(m)
	if err != nil {
		err = errors.New("I am a Custom Error")
		return
	}
	return
}

再次执行测试脚本,这一次。2条测试数据很快就执行结束,并无阻塞。 好像看到了一丝曙光:error 内容的不一样,导致不一样的结果。类型一样,那么我能怀疑的就是格式,或者长度了。

1
2
3
4
5
6
7
AWS 错误信息的格式:
ValidationException: One or more parameter values were invalid:
An AttributeValue may not contain an empty string
	status code: 400

我自定义错误信息的格式:
I am a Custom Error

我特意加长了自定义错误的长度,结果也是能顺利执行不阻塞客户端。但是我给自定义错误字符串加入了换行符,果然客户端再次测试会出现阻塞。当出现错误的时候,源码中是调用了 ErrorReply.WriteTo 这个函数,特别的。返回错误的协议格式是

第一个字节将是“-”,并以CR + LF 结尾

配合源码,以下是 go-redis-server 最核心的逻辑调度代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
for {
	request, err := parseRequest(conn)
	if err != nil {
		return err
	}
		request.Host = clientAddr
		request.ClientChan = clientChan
		reply, err := srv.Apply(request)
		if err != nil {
			return err
		}
		if _, err = reply.WriteTo(conn); err != nil {
			return err
		}
	}
	return nil
}
func (er *ErrorReply) WriteTo(w io.Writer) (int64, error) {
	n, err := w.Write([]byte("-" + er.code + " " + er.message + "\r\n"))
	return int64(n), err
}

Redis协议官方文档,可以确定客户端与服务器端之间传输的每个 Redis 命令或者数据都以 \r\n 结尾。 我们的错误信息中间杀出了 \n,导致协议错乱,redis 客户端不能理解协议,就阻塞了。

一点总结

  • 以后我们使用 go-redis-server 的服务时候,要记得检查返回的字符串或者错误信息有没有包含换行符,如果有,最好做一次过滤替换。

  • 出现这个bug,我和同事都觉得不可思议,非常神奇。在没有其他直观线索的条件下,阅读使用的库的源码,并在源码加上一些输出验证语言加以辅助,收到了效果,的确需要一些耐心。但我觉得是值得的。

  • 李笑来说有效阅读就是精度,这次阅读代码过程中我还有意外的收获,我发现了 reflect 的妙用,以及函数注册在框架可以那么用,读完觉得很满足的样子,值得再记录一番。


附链接:

共享内存与信号量

| Comments

今天和朋友聊天,他多次提到了共享内存,惭愧的是我没怎么用上,只是从 APUE 等神书阅读到此类名词。这个周末,我来搞懂它们。

共享内存

它是 Linux 最底层的通信机制,被称为最快的通信机制。多个进程共享同一个内存区域实现进程间通信。一个进程创建一个共享内存区域,并将数据存放到共享内存中,而后多个进程对其进行访问。

借鉴网友的例子,我做了注释和修改,一个进程写共享内存 (shmwrite.c),一个进程读共享内存(shmread.c)。

共享内存并未提供同步机制,在第一个进程结束对共享内存的写操作之前,并无自动机制阻止第二个进程开始对它进行读取。上述代码中,我通过自己维护了一个变量 isWritten 来控制同步行为。

还好,伟大的计算机先驱们提供了信号量来帮我们解决同步的问题。

信号量

为了防止出现因多个程序同时访问一个共享资源带来的问题,Linux 使用 信号量协调进程对共享资源的访问的。 信号量只能进行两种操作等待和发送信号,即 P(sv) 和 V(sv).

  • P(sv):当sv的值大于零,就减1;当它的值为零,就挂起该进程的执行。
  • V(sv):当有其他进程因等待sv而被挂起,就让它恢复运行,当没有进程因等待sv而挂起,就给它加1.

比如:两个进程共享信号量 sv,其中一个进程执行了 P(sv) 操作,它将得到信号量,进入临界区,将 sv 减1。此时sv=0,第二个进程将被阻止进入临界区,它会被挂起以等待第一个进程离开临界区域,并执行 V(sv) 释放信号量,这时第二个进程就可以恢复执行。

mmap 还是 shmget

这两个东西某种程度上很类似。

内存映射,将用户空间的一段内存区域映射到内核空间,用户对这段内存区域的修改可以直接反映到内核空间,同样,内核空间对这段区域的修改也直接反映用户空间。两者之间需要大量数据传输等操作的话效率是非常高的.

mmap 并不是完全为了用于共享内存而设计的。它提供了不同于一般对普通文件的访问方式,进程可以像读写内存一样对普通文件的操作。而 Posix 或系统V的共享内存 IPC 则纯粹用于共享目的.

mmap 使得进程之间通过映射同一个普通文件实现共享内存。普通文件被映射到进程地址空间后,进程可以像访问普通内存一样对文件进行访问,不必再调用 read(),write() 等操作。mmap 并不分配空间, 只是将文件映射到调用进程的地址空间里 然后你就可以用 memcpy 等操作写文件。


附上代码:

CuckooFilter,BloomFilter的优化

| Comments

面对海量数据,我们需要一个索引数据结构,用来帮助查询,快速判断数据记录是否存在,这类数据结构叫过滤器,常用的选择是 Bloom Filter. 而 Cuckoo Filter 是它的优化变种。借此也用 Golang 实践了这个算法

goCuckoo

Bloom Filter 的位图模式有两个问题:

  • 误报,它能判断元素一定不存在,但只能判断可能存在,因为存在其它元素被映射到部分相同位上,导致该位置1,那么一个不存在的元素可能会被误报成存在;
  • 漏报,如果删除了某个元素,导致该映射位被置0,那么本来存在的元素会被漏报成不存在。

Cuckoo Filter,可以确保该元素存在的必然性,又可以在不违背此前提下删除任意元素,仅仅比 Bloom Filter 牺牲了微量空间效率。 它的的数据模型:

  • 每个元素对应两个哈希算法,在哈希碰撞时会启用备用哈希算法。
  • 每一个桶是有4路的,每个槽对应一个指纹。

model

Feature

  • 支持删除操作
  • 支持快速查找,支持 O(1) 查找速度
  • 高效的空间利用,四路槽的表,可以有95% 的空间利用率
  • 可替代布隆过滤器

Installation

1
go get github.com/zheng-ji/goCuckoo

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import (
	"fmt"
	"github.com/zheng-ji/goCuckoo"
)

func main() {
    // speicify capacity 
	filter := cuckoo.NewCuckooFilter(10000)

	filter.Insert([]byte("zheng-ji"))
	filter.Insert([]byte("stupid"))
	filter.Insert([]byte("coder"))

	if filter.Find([]byte("stupid")) {
		fmt.Println("exist")
	} else {
		fmt.Println("Not exist")
	}

	filter.Del([]byte("stupid"))
	filter.Println(filter.Size())
}

参考

Celery的Crontab实践

| Comments

有时候我们需要处理耗时的操作,同时又要保持较快的响应速度,就需要借助异步队列的帮助。Celery 作为异步队列服务,想必是很多人和我一样的选择。用法在官方文档也详细介绍,不再赘述。

这次想记录的是用 Celery 来实现定时任务。这里也有一点点坑。

以下是 main.py 的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery import Celery
from lib import distribute
from celery.schedules import crontab

app = distribute.app
app.conf.update(
    CELERYBEAT_SCHEDULE = {
        'every-minute': {
            'task': 'test_cron',
            'schedule': crontab(minute="*"),
            'args': (16, 13),
        }
    },
    CELERY_INCLUDE=("apps.tasks",)
)
if __name__ == '__main__':
    app.start()

实际工作单元,我放在 apps 目录下的 tasks.py 文件中

1
2
3
4
from lib.distribute import app
@app.task(name="test_cron")
def mul(x, y):
    return x * y

上述是一个简单的 Crontab 应用,它仅需要以下命令就能执行, 其中 --beat 表示 crontab 的应用

1
python main.py worker --beat -l info

起初我想把异步队列和定时任务放在一起,就加上了一句 CELERY_QUEUES 的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
app.conf.update(
    // 添加的部分
    CELERY_QUEUES=(
        Queue(
          'test', Exchange('test_exchange'),
           routing_key='test_queue'
        ),
    ),
    CELERYBEAT_SCHEDULE = {
        'every-minute': {
            'task': 'test_cron',
            'schedule': crontab(minute="*"),
            'args': (16, 13),
        }
    },
    CELERY_INCLUDE=("apps.tasks",)
)

同样用上述命令开启worker,发现这个时候 Crontab 不能工作了,后来看到官方的文档:

celery beat and celery worker as separate services instead.

也就是说 Celery 的 Beat 需要和其他异步worker 分开,单独执行。

相关代码链接