织网

身体和灵魂,总有一个在路上

Cgroup 限制计算资源

| Comments

Cgroup 实现了对计算机资源使用上的隔离,它是 Docker 底层的基础技术。我们可以用它来限制程序使用的CPU、内存、磁盘。

安装

在 Ubuntu 14.04 下安装的方法:

1
sudo apt-get install cgroup-bin

安装完后执行 mount -t cgroup 会出现如下,可以看到它其实是一个文件系统

1
2
3
4
cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,relatime,cpuset)
cgroup on /sys/fs/cgroup/cpu type cgroup (rw,relatime,cpu)
...
cgroup on /sys/fs/cgroup/hugetlb type cgroup (rw,relatime,hugetlb)

如果没有看到以上的目录,这时候需要手动 mount 了

1
2
3
4
5
6
7
8
9
cd /sys/fs
mkdir cgroup
mount -t tmpfs cgroup_root ./cgroup
mkdir cgroup/cpuset
mount -t cgroup -ocpuset cpuset ./cgroup/cpuset/
mkdir cgroup/cpu
mount -t cgroup -ocpu cpu ./cgroup/cpu/
mkdir cgroup/memory
mount -t cgroup -omemory memory ./cgroup/memory/

实践

我们来感性认识下 cgroup 吧,编写一个耗费 CPU 的程序,姑且叫暴走程序(baozou)

1
2
3
count = 0
while True:
    count = count + 1 - 1

运行该程序,top -p 之,100% CPU使用率

1
2
  PID      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
16515      20   0    2336    728    544 R  100.0  0.0   1:27.23 baozou

我想限制暴走程序 CPU 使用该如何做? 我们手动创建一个 cgroup 目录来针对它。

1
2
3
4
5
cd /sys/fs/cgroup/cpu
mkdir calm      // 名字可自定义
ls /calm        // 该目录下自动生产与 CPU 有关的文件
cgroup.clone_children  cpu.cfs_period_us  cpu.shares  notify_on_release
cgroup.procs           cpu.cfs_quota_us   cpu.stat    tasks

接着写入限制规则

1
2
3
4
5
// 默认是100000,20000意味着限制它的cpu为20%
echo 20000 > /sys/fs/cgroup/cpu/calm/cpu.cfs_quota_us,

// 写入程序的 PID 16515
echo 16515 > /sys/fs/cgroup/cpu/calm/tasks

于是 CPU 就降到 20% 。

1
2
  PID       PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
16515       20   0    2336    728    544 R  20.0  0.0   1:27.23 baozou

除了这种需要指定 PID 来限制资源的方法,也可通过指定规则来执行,更显得方便,效果和上述一致。

1
sudo cgexec -g cpu:calm ./baozou

可以看看这个限制规则做了什么?

1
2
3
4
5
6
7
8
$ sudo cgget calm
calm:
cpu.shares: 1024
cpu.cfs_quota_us: 20000
cpu.stat: nr_periods 6943
    nr_throttled 6941
    throttled_time 563080015831
cpu.cfs_period_us: 100000

上述的例子中,我们手动创建了 calm, 其实也能通过命令来做到的

1
2
3
4
5
6
7
8
9
10
11
// 创建cgroup 文件目录
sudo cgcreate -g cpu:/calm -g memory:/calm

// 设置限制的参数
sudo cgset -r cpu.shares=200 calm

// 限制了内存
sudo cgset -r memory.limit_in_bytes=64k calm

// 可以删除
sudo cgdelete cpu/calm memory:/calm

参考链接:

记一次使用 Redis 协议诡异的bug

| Comments

记录昨天定位一个诡异 bug 的过程,我耗费了不少精力,你若有兴趣,请带着一点耐心看完它。

问题背景

我们使用 go-redis-server 开发具有 redis 协议的服务。 按照文档,我们实现了如下接口,其背后访问的是 AWS 的 Dynamodb,我们的服务也开发了监控接口,以供我们这些程序狗知道它发生了什么。

1
2
3
func (handler *RedisHandler) Get(key string) (result string, err error)
func (handler *RedisHandler) Set(key string, val string) (err error)
func (handler *RedisHandler) Del(key string) (count int, err error)

这样,我们就能通过 redis 客户端来执行 Get,Set,Del 操作。

我在批量写入几千条数据时,通过监控接口,我看到服务突然卡住的样子,没有 Get,Set的统计信息了。但我能肯定的是客户端一直有数据在往服务写入,或者读写。 同时从 AWS 监控得到的反馈是 Dynamodb 使用超过预设值,我调高了读写预设值,重启服务,就恢复可用(重启大法好)。 程序试运行十多天,只发生过一次异常,之后再无重现。 这个事情没有搞清楚,就成为我一个心结。

重现问题

我们来看看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (handler *RedisHandler) 
Set(key string, val string) (err error) {
	...

	m, err := JSONToMap(val)
	_, err = table.PutItem(m)
	if err != nil {
		log.Log.Errorf("PutItem failed, 
		table: %s, primary key: %s, value: %+v, 
		err: %s", 
		tableName, primaryVal, m, err.Error())
		return
	}
	return
}

Set接口,简单的将数据写入 Dynamodb,Dynamodb 如果异常就返回错误,然后通过redis协议返回给客户端。

我很大程度确定那时候是因为 AWS Dynamodb 的异常导致这个错误的。除非这个巧合太牛逼了,难道 go-redis-server 接口不支持返回错误不成吗?这个猜想很快就被我们用实验否定了。

我想重新触发 AWS Dynamodb 返回写容量超标的错误,测试了一阵子,并不容易重现。有点沮丧,这个时候我回忆起 aws-go-sdk 的特点,如果给Dynamodb 字段 赋予空值,是会有异常返回的, 类似如下:

1
2
3
ValidationException: One or more parameter values were invalid:
An AttributeValue may not contain an empty string
	status code: 400

空值的测试明显容易制造。我在本地也开启服务,端口是1234,用 pyredis 作为客户端做测试。 测试脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import redis
import json
import random
r = redis.Redis(host='localhost',port=1234,db=0)

table_name = 'test'
primary_key = 'a'

i = 0
while True:
    if i > 2:
        break
    data = {
        'a': str(random.random()),
        'b': ‘’,
    }
    key = '{0}:{1}:{2}'.format(table_name, primary_key, data[primary_key])
    value = json.dumps(data)
    r.set(key, value)
    i = i + 1

发现测试程序卡在终端,一动不动,strace 测试程序

1
2
3
$ sudo strace -p 22404
Process 22404 attached
recvfrom(3,

感觉像是在等待服务器返回,但是等不到回报的样子。

试着解决问题

难道 go-redis-server 这个框架有猫腻,我就开始了一下午的看源码之旅。不得不说源码写的真好。回头看我们出错的代码片段,我试着修改 err 信息,修改成自己定义的错误字符串。

1
2
3
4
5
6
7
8
9
10
11
func (handler *RedisHandler) Set(key string, val string) (err error) {
	...

	m, err := JSONToMap(val)
	_, err = table.PutItem(m)
	if err != nil {
		err = errors.New("I am a Custom Error")
		return
	}
	return
}

再次执行测试脚本,这一次。2条测试数据很快就执行结束,并无阻塞。 好像看到了一丝曙光:error 内容的不一样,导致不一样的结果。类型一样,那么我能怀疑的就是格式,或者长度了。

1
2
3
4
5
6
7
AWS 错误信息的格式:
ValidationException: One or more parameter values were invalid:
An AttributeValue may not contain an empty string
	status code: 400

我自定义错误信息的格式:
I am a Custom Error

我特意加长了自定义错误的长度,结果也是能顺利执行不阻塞客户端。但是我给自定义错误字符串加入了换行符,果然客户端再次测试会出现阻塞。当出现错误的时候,源码中是调用了 ErrorReply.WriteTo 这个函数,特别的。返回错误的协议格式是

第一个字节将是“-”,并以CR + LF 结尾

配合源码,以下是 go-redis-server 最核心的逻辑调度代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
for {
	request, err := parseRequest(conn)
	if err != nil {
		return err
	}
		request.Host = clientAddr
		request.ClientChan = clientChan
		reply, err := srv.Apply(request)
		if err != nil {
			return err
		}
		if _, err = reply.WriteTo(conn); err != nil {
			return err
		}
	}
	return nil
}
func (er *ErrorReply) WriteTo(w io.Writer) (int64, error) {
	n, err := w.Write([]byte("-" + er.code + " " + er.message + "\r\n"))
	return int64(n), err
}

Redis协议官方文档,可以确定客户端与服务器端之间传输的每个 Redis 命令或者数据都以 \r\n 结尾。 我们的错误信息中间杀出了 \n,导致协议错乱,redis 客户端不能理解协议,就阻塞了。

一点总结

  • 以后我们使用 go-redis-server 的服务时候,要记得检查返回的字符串或者错误信息有没有包含换行符,如果有,最好做一次过滤替换。

  • 出现这个bug,我和同事都觉得不可思议,非常神奇。在没有其他直观线索的条件下,阅读使用的库的源码,并在源码加上一些输出验证语言加以辅助,收到了效果,的确需要一些耐心。但我觉得是值得的。

  • 李笑来说有效阅读就是精度,这次阅读代码过程中我还有意外的收获,我发现了 reflect 的妙用,以及函数注册在框架可以那么用,读完觉得很满足的样子,值得再记录一番。


附链接:

共享内存与信号量

| Comments

今天和朋友聊天,他多次提到了共享内存,惭愧的是我没怎么用上,只是从 APUE 等神书阅读到此类名词。这个周末,我来搞懂它们。

共享内存

它是 Linux 最底层的通信机制,被称为最快的通信机制。多个进程共享同一个内存区域实现进程间通信。一个进程创建一个共享内存区域,并将数据存放到共享内存中,而后多个进程对其进行访问。

借鉴网友的例子,我做了注释和修改,一个进程写共享内存 (shmwrite.c),一个进程读共享内存(shmread.c)。

共享内存并未提供同步机制,在第一个进程结束对共享内存的写操作之前,并无自动机制阻止第二个进程开始对它进行读取。上述代码中,我通过自己维护了一个变量 isWritten 来控制同步行为。

还好,伟大的计算机先驱们提供了信号量来帮我们解决同步的问题。

信号量

为了防止出现因多个程序同时访问一个共享资源带来的问题,Linux 使用 信号量协调进程对共享资源的访问的。 信号量只能进行两种操作等待和发送信号,即 P(sv) 和 V(sv).

  • P(sv):当sv的值大于零,就减1;当它的值为零,就挂起该进程的执行。
  • V(sv):当有其他进程因等待sv而被挂起,就让它恢复运行,当没有进程因等待sv而挂起,就给它加1.

比如:两个进程共享信号量 sv,其中一个进程执行了 P(sv) 操作,它将得到信号量,进入临界区,将 sv 减1。此时sv=0,第二个进程将被阻止进入临界区,它会被挂起以等待第一个进程离开临界区域,并执行 V(sv) 释放信号量,这时第二个进程就可以恢复执行。

mmap 还是 shmget

这两个东西某种程度上很类似。

内存映射,将用户空间的一段内存区域映射到内核空间,用户对这段内存区域的修改可以直接反映到内核空间,同样,内核空间对这段区域的修改也直接反映用户空间。两者之间需要大量数据传输等操作的话效率是非常高的.

mmap 并不是完全为了用于共享内存而设计的。它提供了不同于一般对普通文件的访问方式,进程可以像读写内存一样对普通文件的操作。而 Posix 或系统V的共享内存 IPC 则纯粹用于共享目的.

mmap 使得进程之间通过映射同一个普通文件实现共享内存。普通文件被映射到进程地址空间后,进程可以像访问普通内存一样对文件进行访问,不必再调用 read(),write() 等操作。mmap 并不分配空间, 只是将文件映射到调用进程的地址空间里 然后你就可以用 memcpy 等操作写文件。


附上代码:

CuckooFilter,BloomFilter的优化

| Comments

面对海量数据,我们需要一个索引数据结构,用来帮助查询,快速判断数据记录是否存在,这类数据结构叫过滤器,常用的选择是 Bloom Filter. 而 Cuckoo Filter 是它的优化变种。借此也用 Golang 实践了这个算法

goCuckoo

Bloom Filter 的位图模式有两个问题:

  • 误报,它能判断元素一定不存在,但只能判断可能存在,因为存在其它元素被映射到部分相同位上,导致该位置1,那么一个不存在的元素可能会被误报成存在;
  • 漏报,如果删除了某个元素,导致该映射位被置0,那么本来存在的元素会被漏报成不存在。

Cuckoo Filter,可以确保该元素存在的必然性,又可以在不违背此前提下删除任意元素,仅仅比 Bloom Filter 牺牲了微量空间效率。 它的的数据模型:

  • 每个元素对应两个哈希算法,在哈希碰撞时会启用备用哈希算法。
  • 每一个桶是有4路的,每个槽对应一个指纹。

model

Feature

  • 支持删除操作
  • 支持快速查找,支持 O(1) 查找速度
  • 高效的空间利用,四路槽的表,可以有95% 的空间利用率
  • 可替代布隆过滤器

Installation

1
go get github.com/zheng-ji/goCuckoo

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import (
	"fmt"
	"github.com/zheng-ji/goCuckoo"
)

func main() {
    // speicify capacity 
	filter := cuckoo.NewCuckooFilter(10000)

	filter.Insert([]byte("zheng-ji"))
	filter.Insert([]byte("stupid"))
	filter.Insert([]byte("coder"))

	if filter.Find([]byte("stupid")) {
		fmt.Println("exist")
	} else {
		fmt.Println("Not exist")
	}

	filter.Del([]byte("stupid"))
	filter.Println(filter.Size())
}

参考

Celery的Crontab实践

| Comments

有时候我们需要处理耗时的操作,同时又要保持较快的响应速度,就需要借助异步队列的帮助。Celery 作为异步队列服务,想必是很多人和我一样的选择。用法在官方文档也详细介绍,不再赘述。

这次想记录的是用 Celery 来实现定时任务。这里也有一点点坑。

以下是 main.py 的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery import Celery
from lib import distribute
from celery.schedules import crontab

app = distribute.app
app.conf.update(
    CELERYBEAT_SCHEDULE = {
        'every-minute': {
            'task': 'test_cron',
            'schedule': crontab(minute="*"),
            'args': (16, 13),
        }
    },
    CELERY_INCLUDE=("apps.tasks",)
)
if __name__ == '__main__':
    app.start()

实际工作单元,我放在 apps 目录下的 tasks.py 文件中

1
2
3
4
from lib.distribute import app
@app.task(name="test_cron")
def mul(x, y):
    return x * y

上述是一个简单的 Crontab 应用,它仅需要以下命令就能执行, 其中 --beat 表示 crontab 的应用

1
python main.py worker --beat -l info

起初我想把异步队列和定时任务放在一起,就加上了一句 CELERY_QUEUES 的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
app.conf.update(
    // 添加的部分
    CELERY_QUEUES=(
        Queue(
          'test', Exchange('test_exchange'),
           routing_key='test_queue'
        ),
    ),
    CELERYBEAT_SCHEDULE = {
        'every-minute': {
            'task': 'test_cron',
            'schedule': crontab(minute="*"),
            'args': (16, 13),
        }
    },
    CELERY_INCLUDE=("apps.tasks",)
)

同样用上述命令开启worker,发现这个时候 Crontab 不能工作了,后来看到官方的文档:

celery beat and celery worker as separate services instead.

也就是说 Celery 的 Beat 需要和其他异步worker 分开,单独执行。

相关代码链接

环境变量的那些事

| Comments

四种模式下的环境变量加载

名词解析

  1. login shell: 指用户以非图形化界面 ssh登陆到机器上时获得的第一个 shell。
  2. interactive: 交互式,有输入提示符,它的标准输入输出和错误输出都会显示在控制台上。
  • interactive + login shell

比如登陆机器后的第一个 shell 就是这种场景。它首先加载 /etc/profile,然后再依次去加载下列三个配置文件之一,一旦找到其中一个便不再接着寻找

1
2
3
~/.bash_profile
~/.bash_login
~/.profile

设计如此多的配置是为了兼容 bourne shell 和 C shell,尽量杜绝使用 .bash_login,如果已创建,需要创建 .bash_profile 覆盖

  • noninteractive + login shell

bash -l script.sh 就是这种场景。-l 参数是将shell作为一个login shell启动,配置文件的加载与第一种完全一样。

  • interactive + non-login shell

在一个已有shell中运行bash,此时会打开一个交互式的shell,因为不再需要登陆,所以不是login shell。启动 shell 时会去查找并加载

1
2
/etc/bash.bashrc
~/.bashrc 
  • non-interactive + non-login shell

比如执行脚本 bash script.sh 或者 ssh user@remote command。这两种都是创建一个shell,执行完脚本之后便退出,不再需要与用户交互。它会去寻找环境变量BASH_ENV,将变量的值作为文件名进行查找,如果找到便加载它。

从网上看到一个清晰的图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+----------------+--------+-----------+---------------+
|                | login  |interactive|non-interactive|
|                |        |non-login  |non-login      |
+----------------+--------+-----------+---------------+
|/etc/profile    |   A    |           |               |
+----------------+--------+-----------+---------------+
|/etc/bash.bashrc|        |    A      |               |
+----------------+--------+-----------+---------------+
|~/.bashrc       |        |    B      |               |
+----------------+--------+-----------+---------------+
|~/.bash_profile |   B1   |           |               |
+----------------+--------+-----------+---------------+
|~/.bash_login   |   B2   |           |               |
+----------------+--------+-----------+---------------+
|~/.profile      |   B3   |           |               |
+----------------+--------+-----------+---------------+
|BASH_ENV        |        |           |       A       |
+----------------+--------+-----------+---------------+

跨机器传递环境变量

假设要传递的变量叫做 $VARNAME

客户端机器的 /etc/ssh_config 添加

1
SendEnv VARNAME

服务端机器的 /etc/sshd_config 添加

1
AcceptEnv VARNAME

客户端机器的 $VARNAME 就可以通过 ssh 传递到服务端机器,继续使用.


参考

ssh连接远程主机执行脚本的环境变量问题

准确监控 MySQL 复制延迟

| Comments

MySQL 建立主从复制后,在 Slave_IO_Running,Slave_SQL_Runing 都是 Yes 的前提下,通过监控 Second_Behind_Master 的数值来判断主从延迟时间,该值为0时是否意味着主从同步是无延迟的呢?

1
2
3
4
5
6
7
8
mysql> show slave status\G;
*************************** 1. row ***************************
Slave_IO_State: Waiting for master to send event
....
Slave_IO_Running: Yes
Slave_SQL_Running: Yes
Seconds_Behind_Master: 0
...

很遗憾,我们并不能这样去判断,因为你看到的有可能是假象。

MySQL的同步是异步完成的,其中

  • IO thread 接收从主库的 binlog,然后在从库生成 relay log
  • SQL thead 解析 relay log 后在从库上进行重放

Second_Behind_Master(以下简称SBM) 是 SQL thread 在执行IO thread 生成的relay log的时间差。relay log中event的时间戳是主库上的时间戳,而SQL thread的时间戳是从库上的,SBM 代表的是从库延后主库的时间差。

主库上执行了一个大的操作,这个操作在主库上没执行完毕的时候,从库的 SBM 会显示为0,而当主库执行完毕传到从库上开始执行的时候,SBM 就会显示很大,在网络状况不好的情况下,更是容易出现 SBM 在零和一个巨大的数值反复飘忽的现象。

pt-heartbeat 帮我们准确地检测

pt-heartbeat 是 percona-toolkit 中用来检测主从延迟的工具,需要在主库和从库同时配合才能完成

  • 首先在主库创建监控的表,并定时更新
1
2
3
4
5
6
7
8
9
//创建 heartbeat 
pt-heartbeat --user=root --ask-pass \
            --host=localhost -D <YourDatabase> \
            --create-table --update 

//每隔60s,定时更新状态,以守护进程的方式执行
pt-heartbeat --user=root --ask-pass \
           --host=localhost -D <YourDatabase>\
           --interval=60 --update --replace --daemonize

它会在指定的数据库里生产一张名为 heartbeat 的表,每隔60秒定时更新binlog 文件和位置,以及时间戳。

1
2
3
4
5
+----------------------------+-----------+------------------+-----------+-----------------------+---------------------+
| ts                         | server_id | file             | position  | relay_master_log_file | exec_master_log_pos |
+----------------------------+-----------+------------------+-----------+-----------------------+---------------------+
| 2016-06-03T22:26:29.000720 |         6 | mysql-bin.004| 716| mysql-bin.002|           291330290 |
+----------------------------+-----------+------------------+-----------+-----------------------+---------------------+
  • 接着在从库以守护进程执行定期检测,并将结果重定向到文本
1
2
3
pt-heartbeat --user=root --ask-pass \
     --host=localhost -D <YourDatabase> --interval=60 \
     --file=/tmp/output.txt --monitor --daemonize

文本的内容只有一行,每隔指定的时间就会被覆盖

1
29.00s [ 30.20s,  6.04s,  2.01s ]

29s 表示的是瞬间的延迟时间,30.20s 表示1分钟的延迟时间,6.04秒表示5分钟的延迟时间,2.01秒表示以及15分钟的延迟时间,在主从机器时间校准的前提下,这个数据才是客观准确的主从延迟。

Ansible Dynamic Inventory

| Comments

Ansible 在使用的过程中,如果机器数量比较固定,且变更不多的情况下,可在 /etc/ansible/hosts 文件里面配置固定的组合机器IP, 并给他起组的别名,执行 Ansible 脚本便可以通过别名找到相应的机器。

1
2
[webservers]
111.222.333.444 ansible_ssh_port=888

假如你有很多台机器,且机器经常变更导致IP时常变换,你还想把IP逐个写入 /etc/ansible/hosts 就不现实了。你也许会问,若不把 IP 写进 /etc/ansible/hosts,那不是没法用 Ansible 指挥这些机器? 感谢 Ansible Dynamic Inventory, 如果我们能通过编程等手段获取变更机器的IP,我们还是有办法实现的。

Dynamic Inventory 的原理

  • 通过编程的方式,也就是动态获取机器的 json 信息;
  • Ansible 通过解析这串 json 字符串;
1
ansible -i yourprogram.py -m raw  -a 'cd /home'

Ansible Dynamic Inventory 对程序返回的 json 的转义是这样的:

1
{"devtest-asg": {"hosts": ["172.31.21.164"], "vars": {"ansible_ssh_port": 12306}}}

翻译一下就是 /etc/ansible/hosts 中的:

1
2
[devtest-asg]
172.31.21.164 ansible_ssh_port=12306

一个实战的例子

官方文档对 Inventory 仅作概念性描述,阅读完后仍是一头雾水,不知如何下手。 让我们用一个例子来豁然开朗吧。 我们使用 AWS 的 AutoScaling Group,以下简称 ASG,ASG 会在某种自定义的条件下会自动开启和关闭机器,这给我们在辨别IP,定位机器的时候造成困扰。因此我们需要 Ansible Dynamic Inventory

我们使用 AWS 的 boto 库来获取 ASG 的实例信息.以下程序(get_host.py)中要实现的方法就是列出返回机器信息的 json 串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import boto
import boto.ec2
import boto.ec2.autoscale

AWS_REGION = 'BBB'
AWS_ACCESS_KEY = 'xxxx'
AWS_SECRET_KEY = 'yyy'

result = {}
def getData():
    conn_as = boto.ec2.autoscale.connect_to_region(
            'cn-north-1',
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY)
    group = conn_as.get_all_groups(names=['devtest-asg'])[0]
    conn_ec2 = boto.ec2.connect_to_region(
            AWS_REGION,
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY)

    instance_ids = [i.instance_id for i in group.instances]
    reservations = conn_ec2.get_all_instances(instance_ids)
    instances = [i for r in reservations for i in r.instances]

    result['devtest-asg'] = {}
    result['devtest-asg']['hosts'] = []
    for r in reservations:
        for i in r.instances:
            result['devtest-asg']['hosts'].append('%s' % i.private_ip_address)
            result['devtest-asg']['vars'] = {'ansible_ssh_port': 36000}

def getlists():
    getData()
    print json.dumps(result)

getlists()

执行以下命令就可以愉快地使用 Ansible 了,其中 devtest-asg 是 ASG 的别名:

1
ansible -i get_host.py  devtest-asg -m raw -a 'ls /'

Flume 实时收集 Nginx 日志

| Comments

在分布式系统中,各个机器都有程序运行的本地日志,有时为了分析需求,不得不这些分散的日志汇总需求,相信很多人会选择 Rsync,Scp 之类, 但它们的实时性不强,而且也会带来名字冲突的问题。扩展性差强人意,一点也不优雅。

现实中,我们就碰到了这样的需求:实时汇总线上多台服务器的 Nginx 日志。Flume 立功了。

Flume 简介

Flume 是一个分布式,可靠高效的日志收集系统,它允许用户自定义数据传输模型,因此可扩展性也强。也有较强的容错和恢复机制. 以下是几个重要的概念

  • Event:Event 是 Flume 数据传输的基本单元。flume 以事件的形式将数据从源头传送到最终的目的。
  • Agent:Agent包含 Sources, Channels, Sinks 和其他组件,它利用这些组件将events从一个节点传输到另一个节点或最终目的。
  • Source:Source负责接收events,并将events批量的放到一个或多个Channels。
  • Channel:Channel位于 Source 和 Sink 之间,用于缓存进来的events,当Sink成功的将events发送到下一跳的channel或最终目的,events从Channel移除。
  • Sink:Sink 负责将 events 传输到下一跳或最终目的,成功完成后将events从channel移除。

  • Source 就有 Syslog Source, Kafka Source,HTTP Source, Exec Source Avro Source 等。
  • Sink 有 Kafka Sink, Avro Sink, File Roll Sink, HDFS Sink 等。
  • Channel 有 Memory Channel,File Channel 等

它提供了一个骨架,以及多种 Source, Sink, Channel, 让你设计合适的数据模型。事实上也可以多个 Flume 联动完成,就像地铁的车厢一样。

定义数据流模型

回到我们开头的场景,我们要将多台服务器的 Nginx 日志进行汇总分析,

分成两个 flume 来实现

  • Flume1 数据流是 Exec Source -> Memory Channel -> Avro Sink,部署在业务机器上
  • Flume2 数据流是 Avro Source -> Memory Channel -> FileRoll Sink

需要的准备

你需要安装

  • 下载 Flume
  • 安装 JavaSDk,并在下载解压之后的 conf/flume-env.sh,配置
1
2
# 我用的是oracle-java-8
export JAVA_HOME=/usr/lib/jvm/java-8-oracle/jre/
  • 思考你的数据流动模型,编写配置,如上文所说的Flume1, tail2avro.conf :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
agent.sources = s1
agent.channels = c1
agent.sinks = k1

agent.sources.s1.type=exec
agent.sources.s1.command=tail -F <Your File Path>
agent.sources.s1.channels=c1

agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=10000

agent.sinks.k1.type = avro
agent.sinks.k1.hostname = <Your Target Address>
agent.sinks.k1.port = <Your Target Port>
agent.sinks.k1.channel=c1

Flume2 中的 avro2file.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
agent.sources = s1
agent.channels = c1
agent.sinks = k1

agent.sources.s1.type = avro
agent.sources.s1.bind = <Your Address>
agent.sources.s1.port = <Your Port>
agent.sources.s1.channels = c1

agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /data/log/ngxlog
# 滚动间隔
agent.sinks.k1.sink.rollInterval = 86400
agent.sinks.k1.channel = c1

agent.channels.c1.type = memory
# 队列里 Event 的容量
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.keep-alive = 60
  • 启动运行
1
2
3
4
5
6
7
# 启动flume1
bin/flume-ng agent -n agent -c conf -f conf/tail2avro.conf \
-Dflume.root.logger=WARN

# 启动flume2
in/flume-ng agent -n agent -c conf -f conf/avro2file.conf \
-Dflume.root.logger=INFO

参考

Redis 该选择哪种持久化配置

| Comments

这个标题或许会让你想起《黑客帝国》里经典的台词,你要选择蓝色药丸,还是红色药丸?

Redis 是我们重度使用的一个开源软件,对它的持久化配置做一番相对深入的总结,是值得的。目前它有两种主流的持久化存储方式 SnapShot 以及 AOF 。

什么是 Snapshot

Snapshot 将内存中数据以结构化的方式序列化到 rdb 文件中,是默认的持久化方式,便于解析引擎快速解析和内存实施。快照得由间隔时间,变更次数同时符合才会触发, 该过程中并不阻塞客户端请求,copy-on-write 方式也意味着极端情况下可能会导致实际数据2倍内存的使用量。它首先将数据写入临时文件,结束后,将临时文件重名为 dump.rdb。可以使用 redis-check-dump 用来检测完整性

只有快照结束后才会将旧的文件替换成新的,因此任何时候 RDB 文件都是完整的。如果在触发 snapshot 之前,server 失效。会导致上一个时间点之后的数据未能序列化到 rdb 文件,安全性上稍弱。

我们可手动执行 save 或 bgsave 命令让 redis 执行快照。两个命令的区别在于:

  • save 是由主进程进行快照操作,会阻塞其它请求;
  • bgsave 会通过 fork 子进程进行快照操作;

RDB 文件默认是经过压缩的二进制文件,占用的空间会小于内存中的数据,更加利于传输。设置如下,可以关闭快照功能

1
save ""

相关配置

1
2
3
4
5
6
7
8
# snapshot触发的时机,save <seconds> <changes>, 比如600秒有2个操作
save 600 2
# 当snapshot 时出现错误无法继续时,是否阻塞客户端变更操作 
stop-writes-on-bgsave-error yes 
# 是否启用rdb文件压缩,默认为 yes cpu消耗,快速传输  
rdbcompression yes  
# rdb文件名称  
dbfilename dump.rdb  

什么是 AOF

Append-only file,将 操作 + 数据 以格式化指令的方式追加到操作日志文件的尾部,在 append 操作返回后, 已经写入到文件或者即将写入,才进行实际的数据变更,日志文件保存了历史的操作过程;当 server 需要数据恢复时,可以直接回放此日志文件,即可还原所有的操作过程。 如果你期望数据更少的丢失,那么可以采用 AOF 模式。可以用 redis-check-aof 检测文件是否完整。

AOF 就是日志会记录变更操(例如:set/del等),会导致AOF文件非常的庞大,意味着server失效后,数据恢复的过程将会很长;事实上,一条数据经过多次变更,将会产生多条AOF记录,其实只要保存当前的状态,历史的操作记录是可以抛弃的, 由此催生了 AOF ReWrite。

什么是 AOF Rewrite

其实是压缩 AOF 文件的过程,Redis 采取了类似 Snapshot 的方式:基于 copy-on-write,全量遍历内存中数据,然后逐个序列到 aof 文件中。因此 AOF Rewrite 能够正确反应当前内存数据的状态, Rewrite 过程中,新的变更操作将仍然被写入到原 AOF 文件中,同时这些新的变更操作也会被收集起来, 并不阻塞客户端请求。

相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
##只有在yes下,aof重写/文件同步等特性才会生效  
appendonly no  
  
##指定aof文件名称  
appendfilename appendonly.aof  
  
##指定aof操作中文件同步策略,有三个合法值:always everysec no,默认为everysec  
appendfsync everysec  

##在aof-rewrite期间,appendfsync 是否暂缓文件同步,no 表示不暂缓,yes 表示暂缓,默认为no  
no-appendfsync-on-rewrite no  
  
##aof文件rewrite触发的最小文件尺寸 只有大于此aof文件大于此尺寸是才会触发rewrite,默认64mb,建议512mb  
auto-aof-rewrite-min-size 64mb  
  
##相对于上一次rewrite,本次rewrite触发时aof文件应该增长的百分比
auto-aof-rewrite-percentage 100  

appendfsync 方式:

  • always:每一条 aof 记录都立即同步到文件,这是最安全的方式,但是更多的磁盘操作和阻塞延迟,IO 开支较大。
  • everysec:每秒同步一次,性能和安全也是redis推荐的方式。如果服务器故障,有可能导致最近一秒内aof记录丢失。
  • no:redis并不直接调用文件同步,而是交给操作系统来处理,操作系统可以根据buffer填充情况等择机触发同步;性能较好,在物理服务器故障时,数据丢失量会因OS配置有关。

选择哪种药丸

  • AOF更安全,可将数据及时同步到文件中,但需要较多的磁盘IO,AOF文件尺寸较大,文件内容恢复相对较慢, 也更完整。
  • Snapshot,安全性较差,它是正常时期数据备份及 master-slave 数据同步的最佳手段,文件尺寸较小,恢复数度较快。

主从架构的环境下的选择

  • 通常 master 使用AOF,slave 使用 Snapshot,master 需要确保数据完整性,slave 提供只读服务.
  • 如果你的网络稳定性差, 物理环境糟糕情况下,那么 master, slave均采取 AOF,这个在 master, slave角色切换时,可以减少时间成本;