Follower me on GitHub
×

博客更新通知

感谢一直支持我的朋友,目前在做一个自己的博客项目。几个月后会有新版本的博客,新版本博客将主要涉及python,ruby,lisp,算法,devops,emacs,github开源项目等方面内容,敬请关注

前言

原来的一位同事@炮哥, 昨天在QQ问我: “想请教下一个线程获得lock之后,也可能被其他的线程释放掉?这个是因为线程之间的资源是共享的吗?这样的话普通的thread lock 都是不安全的了?”. 我第一反应就是怎么可能:’谁加锁谁解锁呗,别的线程怎么能解锁?’

炮哥发来一段python官网的文档说明:

A factory function that returns a new primitive lock object. Once a thread has acquired it, subsequent attempts to acquire it block, until it is released; any thread may release it.

最有最后一句前是分号. 简单地说,一个线程获取锁, 以后的尝试获取都会被阻塞,除非它会释放. 但是同时其他其他线程可以释放

好,写个例子验证下:

import threading
import time

lock = threading.Lock()

def lock_holder(lock):
    print('Starting')
    while True:
        lock.acquire()
        print('Holding')
        time.sleep(100)
        print('Sleep done')

def lock_release(lock):
    time.sleep(1) # 保证顺序
    lock.release()
    print('Release it')


holder = threading.Thread(target=lock_holder, args=(lock,), name='LockHolder')
holder.setDaemon(True)
holder.start()

#lock_release(lock)
release = threading.Thread(target=lock_release, args=(lock,), name='release')
release.start()

holder = threading.Thread(target=lock_holder, args=(lock,), name='LockHolder')
holder.setDaemon(True)
holder.start()

奇迹发生了…. 线程b果然可以释放线程a的锁,颠覆人生观啊.

PS: 但是Rlock不会被其他线程释放,因为它记录该线程的所有者

前言

今天是在Ad的最后一天,本来准备了一个分享.关于业务中一些吐槽和我一些trick的用法, 有兴趣的可以下载speakerdeck

主题

  1. celery celery2/celery3, py-amqp, kombu的用法, celery和djangocelery的集合
  2. expect 使用expect自动登录复杂的服务器
  3. mapreduce 一个并行处理文件的例子,说明使用python跑mapreduce多么厉害
  4. portforward 端口转发
  5. restapi 我眼中的restapi(pdf)
  6. tornado 使用tornado一部非阻塞

演示的tmux脚本:

#!/bin/bash
SESSION=$USER
COMMAND='http Space http://localhost:8000/sleep'

tmux new-session -d -s $SESSION

tmux new-window -t $SESSION -n 'Logs'
tmux split-window -h
tmux select-pane -t 0
tmux send-keys $COMMAND C-m
tmux select-pane -t 1
tmux send-keys $COMMAND C-m
tmux split-window -v
tmux send-keys $COMMAND C-m
tmux select-pane -t 0
tmux split-window -v
tmux send-keys $COMMAND C-m
# Attach to session
tmux attach-session -t $SESSION

前言

我以前在学习python模块的时候,曾经翻译pymotw的文章,其实还是有抄袭的嫌疑,从最近开始逐渐直接阅读python标准库源码, 收获颇多. 我现在不愿意教一些从网上或者书里提到的知识点,而更愿意根据我工作中常见的需求去挖掘对应的python的解法.也是在过程中对一些东西有了比较深的理解. 这个ppt是从像黑客一样使用 Linux 命令行获得的灵感. 然后角度为, 还用到了webfonts娃娃体^.^

PS: 特别推荐github上看到的”雨痕”的学习笔记. 建议大家都好好看看.

找到它

Expert-Python 或者直接下载代码: github

但是注意我的字体内嵌项目里面, 请注意流量,避免移动设备直接访问或者强制刷新

目录

  1. XX不理解python竟然没有end….
  2. 设置全局变量
  3. 字符串格式化
  4. 操作列表
  5. 操作字典
  6. 字典视图
  7. vars
  8. from future import unicode_literals
  9. from future import absolute_import
  10. 不是支持了绝对引入,而是拒绝隐式引入
  11. 我靠,我的需求呢? – 在很多开源项目是拒绝你第一次的隐式用法的,
  12. 一个关于编码的问题
  13. 原因是: encoding_example里面没有对文字自动转化为unicode,默认是ascii编码
  14. super 当子类调用父类属性时一般的做法是这样
  15. super的一种用法
  16. 假如不用super会这么惨
  17. 手写一个迭代器
  18. 标准迭代器
  19. 生成器
  20. 斐波那契数列
  21. 其实yield和协程关系很密切
  22. 来个回调(阻塞的)
  23. 来个回调(异步的)
  24. 看到这里, 就得说说contextmanager
  25. 包导入
  26. 包构建__all__
  27. 包构建__path__
  28. 静态方法和类方法的区别
  29. 静态方法和类方法的区别其实是在这里
  30. __slots__
  31. Packaging Tools的未来
  32. wheel(即将替代Eggs的二进制包格式)的优点
  33. 装饰器
  34. 给函数的类装饰器
  35. 给类的函数装饰器
  36. 带参数的装饰器
  37. @property
  38. @property的另外使用方法
  39. 元类是什么
  40. 模拟生成一个类
  41. 元类: __metaclass__(实现前面的Hello类)
  42. 一个难懂的元类
  43. 描述符
  44. 模块: itertools
  45. 模块: collections(一)
  46. 模块: collections(二)
  47. 模块: collections(三)
  48. 模块: collections(四)
  49. operator模块(一)
  50. operator模块(二)
  51. operator模块(三)
  52. functools模块之partial
  53. functools模块之wraps
  54. functools模块之cmp_to_key
  55. functools模块之total_ordering
  56. 开发陷阱(一) 可变默认参数
  57. 开发陷阱(二) 闭包变量绑定
  58. 开发陷阱(二) 闭包应该的用法
  59. 在合适的地方用合适的技巧
  60. 不是它不好,而是你没有用好
  61. ipython的技巧(一)
  62. ipython的技巧(二)
  63. 联系方式

UPDATE 2014.04.11

今天下午分享了这个ppt. 并且用quicktime录像. 想听的可以从百度网盘下载或者在线看(793.6M). 时长2小时零一分.

中间有个列表去重. 有同学说去重后无法保证第一次出现重复数据位置的顺序.

刚才想起来试了一下:

>>> l = [1, 2, 4, 7, 2, 1, 8, 6, 1]
    >>> list(set(l))
    [1, 2, 4, 6, 7, 8]
    >>> {}.fromkeys(l).keys()
    [1, 2, 4, 6, 7, 8]  # 注意这个和上面结果是一样的,也就是内部实现的去重原理相同
    >>> l = ['a', 'b', 'c', 'd', 'b', 'a']
    >>> list(set(l))
    ['a', 'c', 'b', 'd']
    >>> {}.fromkeys(l).keys()
    ['a', 'c', 'b', 'd']
    >>> from collections import OrderedDict
    >>> OrderedDict().fromkeys(l).keys()  # 只能使用这样的方法实现保证顺序的实现
    [1, 2, 4, 7, 8, 6]                                 # 感谢@杨博的提醒

新的PYPI的DEMO: http://pypi-preview.a.ssl.fastly.net

前言

在豆瓣开源项目里面有个graph-index, 提供监控服务器的状态的目录索引,基于graph-explorer. 类似衍生物很多,就包括我要说的本文用到的项目.先看看我的测试环境的几个截图

一些关键词说明

  1. graphite-web # graphite组件之一, 提供一个django的可以高度扩展的实时画图系统
  2. Whisper # graphite组件之一, 实现数据库存储. 它比rrdtool要慢,因为whisper是使用python写的,而rrdtool是使用C写的。然而速度之间的差异很小
  3. Carbon # 数据收集的结果会传给它, 它会解析数据让它可用于实时绘图. 它默认可会提示一些类型的数据,监听2003和2004端口
  4. Diamond # 他是一个提供了大部分数据收集结果功能的结合,类似cpu, load, memory以及mongodb,rabbitmq,nginx等指标.这样就不需要我大量的写各种类型,因为它都已经提供,并且它提供了可扩展的自定义类型(最后我会展示一个我自己定义的类型)
  5. grafana # 这个面板是基于node, kibana,并且可以在线编辑. 因为是kibana,所以也用到了开元搜索框架elasticsearch

PS: 其他工具可以参考这里Tools That Work With Graphite

原理解析

我没有看实际全部代码,大概的流程是这样的:

  1. 启动Carbon-cache等待接收数据(carbon用的是twisted)
  2. 启动graphite-web给grafana提供实时绘图数据api
  3. 启动grafana,调用graphite-web接口获取数据展示出来
  4. Diamond定期获取各类要监测的类型数据发给carbon(默认是5分钟,默认一小时自动重载一次配置)

实现我这个系统需要做的事情

安装graphite相关组件(我这里用的是centos)
yum --enablerepo=epel install graphite-web python-carbon -y
安装grafana需要的组件
# 增加elasticsearch的repo:
sudo  rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch
$cat /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-1.0]
name=Elasticsearch repository for 1.0.x packages
baseurl=http://packages.elasticsearch.org/elasticsearch/1.0/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
sudo yum install nginx nodejs npm java-1.7.0-openjdk elasticsearch -y
下载Diamond和grafana
git clone https://github.com/torkelo/grafana
cd grafana
sudo npm install
sudo pip install django-cors-headers configobj # 这可能因为我环境中已经有了一些模块,看缺什么安装什么
git clone https://github.com/BrightcoveOS/Diamond
cd Diamond

##### 开始修改配置

  1. 添加cors支持

在/usr/lib/python2.6/site-packages/graphite/app_settings.py:

INSTALLED_APPS里面添加corsheaders, MIDDLEWARE_CLASSES里面添加’corsheaders.middleware.CorsMiddleware’

  1. 使用nginx使用grafana

在nginx.conf 添加类型的一段配置

server {
  listen                *:80 ;

  server_name           monitor.dongwm.com; # 我用了虚拟主机
  access_log            /var/log/nginx/kibana.myhost.org.access.log;

  location / {
    add_header 'Access-Control-Allow-Origin' "$http_origin";
    add_header 'Access-Control-Allow-Credentials' 'true';
    root  /home/operation/dongwm/grafana/src;
    index  index.html  index.htm;
  }

  location ~ ^/_aliases$ {
    proxy_pass http://127.0.0.1:9200;
    proxy_read_timeout 90;
  }
  location ~ ^/_nodes$ {
    proxy_pass http://127.0.0.1:9200;
    proxy_read_timeout 90;
  }
  location ~ ^/.*/_search$ {
    proxy_pass http://127.0.0.1:9200;
    proxy_read_timeout 90;
  }
  location ~ ^/.*/_mapping$ {
    proxy_pass http://127.0.0.1:9200;
    proxy_read_timeout 90;
  }

  # Password protected end points
  location ~ ^/kibana-int/dashboard/.*$ {
    proxy_pass http://127.0.0.1:9200;
    proxy_read_timeout 90;
    limit_except GET {
      proxy_pass http://127.0.0.1:9200;
      auth_basic "Restricted";
      auth_basic_user_file /etc/nginx/conf.d/dongwm.htpasswd;
    }
  }
  location ~ ^/kibana-int/temp.*$ {
    proxy_pass http://127.0.0.1:9200;
    proxy_read_timeout 90;
    limit_except GET {
      proxy_pass http://127.0.0.1:9200;
      auth_basic "Restricted";
      auth_basic_user_file /etc/nginx/conf.d/dongwm.htpasswd;
    }
  }
  1. 修改grafana的src/config.js:

graphiteUrl: “http://”+window.location.hostname+”:8020”, # 下面会定义graphite-web启动在8020端口

  1. 修改Diamond的配置conf/diamond.conf
cp conf/diamond.conf.example conf/diamond.conf

主要修改监听的carbon服务器和端口,以及要监控什么类型的数据,看我的一个全文配置

################################################################################
# Diamond Configuration File
################################################################################

################################################################################
### Options for the server
[server]

# Handlers for published metrics.
handlers = diamond.handler.graphite.GraphiteHandler, diamond.handler.archive.ArchiveHandler

# User diamond will run as
# Leave empty to use the current user
user =

# Group diamond will run as
# Leave empty to use the current group
group =

# Pid file
pid_file = /home/dongwm/logs/diamond.pid # 换了pid的地址,因为我的服务都不会root启动

# Directory to load collector modules from
collectors_path = /home/dongwm/Diamond/src/collectors # 收集器的目录,这个/home/dongwm/Diamond就是克隆代码的地址

# Directory to load collector configs from
collectors_config_path = /home/dongwm/Diamond/src/collectors

# Directory to load handler configs from
handlers_config_path = /home/dongwm/Diamond/src/diamond/handler

handlers_path = /home/dongwm/Diamond/src/diamond/handler

# Interval to reload collectors
collectors_reload_interval = 3600 # 收集器定期会重载看有没有配置更新

################################################################################
### Options for handlers
[handlers]

# daemon logging handler(s)
keys = rotated_file

### Defaults options for all Handlers
[[default]]

[[ArchiveHandler]]

# File to write archive log files
log_file = /home/dongwm/logs/diamond_archive.log

# Number of days to keep archive log files
days = 7

[[GraphiteHandler]]
### Options for GraphiteHandler

# Graphite server host
host = 123.126.1.11

# Port to send metrics to
port = 2003

# Socket timeout (seconds)
timeout = 15

# Batch size for metrics
batch = 1

[[GraphitePickleHandler]]
### Options for GraphitePickleHandler

# Graphite server host
host = 123.126.1.11

# Port to send metrics to
port = 2004

# Socket timeout (seconds)
timeout = 15

# Batch size for pickled metrics
batch = 256

[[MySQLHandler]]
### Options for MySQLHandler

# MySQL Connection Info 这个可以你的会不同
hostname    = 127.0.0.1
port        = 3306
username    = root
password    =
database    = diamond
table       = metrics
# INT UNSIGNED NOT NULL
col_time    = timestamp
# VARCHAR(255) NOT NULL
col_metric  = metric
# VARCHAR(255) NOT NULL
col_value   = value

[[StatsdHandler]]
host = 127.0.0.1
port = 8125

[[TSDBHandler]]
host = 127.0.0.1
port = 4242
timeout = 15

[[LibratoHandler]]
user = user@example.com
apikey = abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz01

[[HostedGraphiteHandler]]
apikey = abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz01
timeout = 15
batch = 1

# And any other config settings from GraphiteHandler are valid here

[[HttpPostHandler]]

### Urp to post the metrics
url = http://localhost:8888/
### Metrics batch size
batch = 100


################################################################################
### Options for collectors
[collectors]
[[TencentCollector]] # 本来[collectors]下试没有东西的,这个是我定制的一个类型
ttype = server
[[MongoDBCollector]] # 一般情况,有一些类型是默认enabled = True,也就是启动的,但是大部分是默认不启动《需要显示指定True
enabled = True
host = 127.0.0.1 # 每种类型的参数不同
[[TCPCollector]]
enabled = True
[[NetworkCollector]]
enabled = True
[[NginxCollector]]
enabled = False # 没开启nginx_status 开启了也没用
[[ SockstatCollector]]
enabled = True
[[default]]
### Defaults options for all Collectors

# Uncomment and set to hardcode a hostname for the collector path
# Keep in mind, periods are seperators in graphite
# hostname = my_custom_hostname

# If you prefer to just use a different way of calculating the hostname
# Uncomment and set this to one of these values:

# smart             = Default. Tries fqdn_short. If that's localhost, uses hostname_short

# fqdn_short        = Default. Similar to hostname -s
# fqdn              = hostname output
# fqdn_rev          = hostname in reverse (com.example.www)

# uname_short       = Similar to uname -n, but only the first part
# uname_rev         = uname -r in reverse (com.example.www)

# hostname_short    = `hostname -s`
# hostname          = `hostname`
# hostname_rev      = `hostname` in reverse (com.example.www)

# hostname_method = smart

# Path Prefix and Suffix
# you can use one or both to craft the path where you want to put metrics
# such as: %(path_prefix)s.$(hostname)s.$(path_suffix)s.$(metric)s
# path_prefix = servers
# path_suffix =

# Path Prefix for Virtual Machines
# If the host supports virtual machines, collectors may report per
# VM metrics. Following OpenStack nomenclature, the prefix for
# reporting per VM metrics is "instances", and metric foo for VM
# bar will be reported as: instances.bar.foo...
# instance_prefix = instances

# Default Poll Interval (seconds)
# interval = 300

################################################################################
### Options for logging
# for more information on file format syntax:
# http://docs.python.org/library/logging.config.html#configuration-file-format

[loggers]

keys = root

# handlers are higher in this config file, in:
# [handlers]
# keys = ...

[formatters]

keys = default

[logger_root]

# to increase verbosity, set DEBUG
level = INFO
handlers = rotated_file
propagate = 1

[handler_rotated_file]

class = handlers.TimedRotatingFileHandler
level = DEBUG
formatter = default
# rotate at midnight, each day and keep 7 days
args = ('/home/dongwm/logs/diamond.log', 'midnight', 1, 7)

[formatter_default]

format = [%(asctime)s] [%(threadName)s] %(message)s
datefmt =
启动相关服务
sudo /etc/init.d/nginx reload
sudo /sbin/chkconfig --add elasticsearch
sudo service elasticsearch start
sudo service carbon-cache restart
sudo python /usr/lib/python2.6/site-packages/graphite/manage.py runserver 0.0.0.0:8020 # 启动graphite-web到8020端口
在每个要搜集信息的agent上面安装Diamond,并启动:
cd /home/dongm/Diamond
python ./bin/diamond --configfile=conf/diamond.conf

# PS: 也可以添加 -l -f在前台显示
自定义数据搜集类型,也就是上面的TencentCollector
# coding=utf-8 

"""
获取腾讯微博爬虫的业务指标
"""

import diamond.collector
import pymongo
from pymongo.errors import ConnectionFailure


class TencentCollector(diamond.collector.Collector): # 需要继承至diamond.collector.Collector
    PATH = '/home/dongwm/tencent_data'
    
    def get_default_config(self):
        config = super(TencentCollector, self).get_default_config()
        config.update({
            'enabled':  'True',
            'path':     'tencent',
            'method':   'Threaded',
            'ttype':    'agent' # 服务类型 包含agent和server
        })
        return config

    def collect(self):
        ttype = self.config['ttype']
        if ttype == 'server':
            try:
                db = pymongo.MongoClient()['tmp']
            except ConnectionFailure:
                return
            now_count = db.data.count()
            try:
                last_count = db.diamond.find_and_modify(
                    {}, {'$set': {'last': now_count}}, upsert=True)['last']
            except TypeError:
                last_count = 0
            self.publish('count', now_count)
            self.publish('update', abs(last_count - now_count))
        if ttype == 'agent':
            # somethings..........
添加你要绘图的类型. 这个就是打开grafana, 添加不同的row.给每个添加panel.选择metric的类型就好了

前言

过年在家无聊, 发现个挺有意思的项目: linux-dash,就是使用Twitter的Bootstrap做的管理模板,可以显示服务器信息, 负载, 内存,进程,硬盘,用户,安装/未安装的软件信息,网卡IP,网速,以及网络状态,在线用户等. 但是有2个问题:

  1. 它是php的…
  2. 它获取信息都是通过shell下得系统命令

我就用django写了一个python版的django-linux-dash:本来想用flask,结果被人用了,但是问题也是通过subprocess调用命令获取数据,这个轮子有以下优点:

  1. 不仅支持linux,也支持OS X
  2. 使用psutil, 项目完全不调用shell命令

安装和使用

需要django>=1.4以及psutils模块

$pip install/easy_install django
$pip install/easy_install psutil

PS: OS X 还需要netifaces模块用于获取网卡ip

$pip install/easy_install netifaces

启动:

$cd /You/install/path
$git clone https://github.com/dongweiming/django-linux-dash && cd django-linux-dash
$python manage.py runserver 0.0.0.0:8000

打开浏览器输入 http://localhost:8000 就可以看见了…

TODO:

  1. 目前还没有添加测速功能,因为我希望不要一直傻瓜式的下载某文件,根据用时计算平均值,因为第一它需要时间才会显示个速度,其次是不实时不能循环实时
  2. 添加更多信息模块
  3. 增加用户登陆和权限控制 …

前言

mapreduce在我的理解里一直都是java等语言的专利,介于python乃至于pypy的性能局限, 一直没想过用python写分布式任务,最多就是多workers从消息队列取任务执行这样,但是最近一件事真的颠覆 了我对python的认识.

先说说起因

某天分享sed和awk,领导突发奇想让我用一些顾问的实际工作需要去我们的大量数据里面获取想要的数据的需求作为一些演示的例子.其中有这样一个需求(我去掉实际一些专业晦涩的用语,用实际的内容来表达):

需求
1. 有大量的gz压缩文件, 找到其中某2天的数据, 每一行都是一条实际数据
2. 需要解压缩每个文件,遍历每行找到用逗号隔开的第21列为16233,23列为27188的行. 以第2列为键计算符合的数量
3. 在全部统计结果里面根据值计算符合的键的数量: 比如{'a':2, 'b':1, 'c':1},结果就是{1:2, 2:1},也就是2次的有2,1次的只有一个
分析

一上来真的想用awk来搞.但是和其他同事一聊,有几个难点:

1. 2天数据总量在400G以上,awk还要保留2次哈希结果-不可能用awk
2. python,据同事经验说:只是解压缩这些小文件后读取什么都不做也大概1天多的时间,完全不能忍
3. 数据还没有放到hadoop, 没有其他更好更快的方法
解题思路:
  1. 最初我想做成这样:

    1. 把需要处理的这些压缩文件放到队列里面
    2. 启动多进程出队列里面获取要处理的文件,执行,把符合的结果放到共享变量叠加
    3. 计算完成后从共享变量里面或者数据在生成上面第三条的结果

但是今天讲的是python得mapreduce,也就是我后续的版本,它源于伟大的Doug Hellmann的Implementing MapReduce with multiprocessing

#!/usr/bin/env python
#coding=utf-8
# python mapreduce 跑数实现
# Author: Dongweiming
import gzip
import time
import os
import glob
import collections
import itertools
import operator
import multiprocessing


class AdMapReduce(object):

    def __init__(self, map_func, reduce_func, num_workers=None):
        '''
        num_workers: 不指定就是默认可用cpu的核数
        map_func: map函数: 要求返回格式类似:[(a, 1), (b, 3)]
        reduce_func: reduce函数: 要求返回格式类似: (c, 10)
        '''
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)

    def partition(self, mapped_values):
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()

    def __call__(self, inputs, chunksize=1):
        '''调用类的时候被触发'''
        # 其实都是借用multiprocessing.Pool.map这个函数, inputs是一个需要处理的列表,想想map函数
        # chunksize表示每次给mapper的量, 这个根据需求调整效率
        map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize)
        # itertools.chain是把mapper的结果链接起来为一个可迭代的对象
        partitioned_data = self.partition(itertools.chain(*map_responses))
        # 大家想,上面的就是[(a, [1,2]), (b, [2,3]),列表中的数就是当时符合的次数,reduce就是吧列表符合项sum
        reduced_values = self.pool.map(self.reduce_func, partitioned_data)
        return reduced_values


def mapper_match(one_file):
    '''第一次的map函数,从每个文件里面获取符合的条目'''
    output = []
    for line in gzip.open(one_file).readlines():
        l = line.rstrip().split(',')
        if int(l[20]) == 16309 and int(l[22]) == 2656:
            cookie = l[1]
            output.append((cookie, 1))
    return output


def reduce_match(item):
    '''第一次的reduce函数,给相同的key做统计'''
    cookie, occurances = item
    return (cookie, sum(occurances))


def mapper_count(item):
    '''第二次mapper函数,其实就是把某key的总数做键,但是值标1'''
    _, count = item
    return [(count, 1)]


def reduce_count(item):
    '''第二次reduce函数'''
    freq, occurances = item
    return (freq, sum(occurances))


if __name__ == '__main__':
    start = time.time()
    input_files = glob.glob('/datacenter/input/2013-12-1[01]/*')
    mapper = AdMapReduce(mapper_match, reduce_match)
    cookie_feq = mapper(input_files)
    mapper = AdMapReduce(mapper_count, reduce_count)
    cookie_feq = mapper(cookie_feq)
    cookie_feq.sort(key=operator.itemgetter(1))
    for freq, count in cookie_feq:
        print '{0}\t{1}\t{2}'.format(freq, count, freq*count)
    #cookie_feq.reverse()
    end = time.time()
    print 'cost:', end - start

后话

哇,看python做mapreduce也是可以这样优雅的, 我是用pypy跑下来,竟然只有了61分钟….

但是其实他只是借助mapreduce思想和多核的硬件基础,其实pool做的还是文件级别的处理.假如是少量的大文件,就未必有这样好的效果了.

我想很多时候这样的工作都可以交给这个Admapreduce类来做

前言

最近做一个关于sed和awk的分享,这里把源码开源:sed_and_awk,或者直接访问http://dongweiming.github.io/sed_and_awk. 我这个ppt基本覆盖90%以上的知识点.

一些说明

我测试例子都是在osx下,freebsd的sed和awk和gnu的都略有不同.甚至osx下得版本都不能使用,我会在注释中说明.

  • sed

    1. sed 通用
    2. /usr/local/bin/sed osx下编译的gnu sed
  • awk

    1. awk 通用
    2. gawk osx编译的gnu awk

前言

我想很多开发的同学都经历过这样的开发流程:

  • 本地修改代码
  • 把代码推到测试环境
  • 重启测试环境需要的服务
  • 本地看效果,改bug重复1-3
  • 功能稳定后上线

这个过程有什问题呢?

  • 假设你是一个做过运维或者能力很强的人,比如我这种,本地跑测试环境,但是假如一个新人,或者对测试环境中的某些 部分不是很了解的人, 甚至需要和生产环境完全一样的条件下, 本地可能就不好使了. 那么这需要一个测试服务器

  • 首先你每次修改代码, push ,重启环境都需要你登陆测试环境,至少不够自动化.然后退回本地看效果,这个过程有点浪费时间和经理
  • 其次是你可能有好几个项目,他们之间可能都没什么共通点. 你需要多个测试环境
  • 当然你可以写几个脚本,为你每个测试环境写一个东西去自动化这些,未尝不可. 只是需要重复造很多轮子
  • 假如为了安全有跳板机,你需要登陆跳板机才能跳到你的测试服务器,你可能要写很复杂的expect脚本

然后是我认为最重要的:

凡是屁大点事就放个deamon的运维都是耍流氓, 就拿小屁几台服务器还搞神马salt, ansible之类的事情,真是太无聊了.

这些东西帮助你做了很多事情,但是会让你变得更懒.而且重要的是-它们写的并不一定只符合你的需要或者就不符合你的需要. 我喜欢简单粗暴的实现,最近在看fabric的代码, 作为做过op,也给salt贡献过代码的我,写了这个东西: gentle, 帮助我自动化提交代码到我的测试环境. 这个东西是我认为符合我需要,或者大部分开发同学需要的小东西,基于fabric, docopt 和yaml.

我的工作的一些特点

我负责几个项目, 它们有以下特点

  • 项目在不同的机房, 有完整的测试环境和相关数据
  • 项目依赖的服务基本不同,比如有的使用了supervisor, 有的是程序fork后退出了父进程;有的使用了nginx+uwsgi,有的就是nginx+服务等.
  • 项目之间需要的依赖应用不同,且启动顺序有区别. 这个很好理解, 启动需要先启动A,再启动B,才能启动C
  • 项目有的需要登陆跳板机

我以前的个人的开发习惯和流程

  • 我有一个专门的存放服务配置的目录, 后缀是ip或者项目的名字. git版本库, 每次更新后上传到测试环境
  • 我有专门的op PATH, 做了很多alias, 都是一些python或者shell的脚本,用来同步测试环境,登陆测试环境撑起服务的脚本

看起来以前用的也不错. 但是gentle能怎么样提高呢?

gentle的开发流程

  • 切换到你要开发的目录
  • 初始化这个目录,其实就是在当前目录增加一个.gentle.yaml
  • 根据你的需要配置测试环境账号ip密码, 想要同步的目录, 需要重启的服务和优先级已经命令
  • 以后每次只需在这个目录下,使用gt publish 或者更懒 gt p, 他就会帮你自动rsync然后重启相关服务.

具体使用可以去我的github或者readthedocs.org

安装和依赖

我已经放到了pypi, 你可以使用pip或者easy_install

sudo pip install gentle

这样在系统环境下会有一个gt命令

fabric有2个对于项目更新的函数,一个是rsync_project,一个是upload_project, upload是把项目压缩在服务器上解压缩,实际上很浪费时间,但是rsync_project不能使用env.password,需要手动输入一次密码,都很不爽,我找到一个解决办法就是sshpass, 帮助自动输入rsync的密码, 我也给fabric提了PR. 我的项目使用了我个人的改进版的rsync_project

如果你是ubuntu 直接:

sudo apt-get install sshpass

其他操作系统可以在这里下载 http://sourceforge.net/projects/sshpass/ 然后编译

tar zxvf sshpass-1.05.tar.gz && cd sshpass-1.05 && ./configure &&
make && sudo make install

一个我的测试环境的例子, 我加注释说明

host: 192.168.3.11 #测试服务器, 格式是user@host:port
password: dongwmspassword #登录服务器需要的密码
gateway: #可以不存在,中间服务器,格式是user@host:port
gatewaypassword: #中间服务器(jump跳板机)的密码
rsync: #这个操作是rsync
  lpath: $ROOT #你可以使用全路径, $ROOT表示当前路径,本地目录
  rpath: /opt/tornado # 测试环境的目录
services: # 每个段落就是一个服务,这里有nginx和supervisor
  nginx:
    command: kill -9 `ps -ef |grep nginx|grep -v grep|awk '{print $2}'` && /opt/nginx/sbin/nginx #启动的命令
    lpath: /usr/local/etc/nginx/nginx.conf #nginx.conf的本地地址
    priority: 1 #优先级越高越先执行
    rpath: /opt/nginx/conf/nginx.conf #测试环境的nginx.conf路径
    sudo: false # 因为这个环境很宽松 不需要sudo
    user: #sudo使用的用户,默认是登陆的用户
  supervisor:
    command: supervisorctl -c /etc/supervisor/supervisord.conf reload
    lpath: /Users/dongwm/settings/supervisord.conf.31
    priority: 2
    rpath: /etc/supervisor/supervisord.conf
    sudo: false
    user:
username: root #登陆服务器的默认用户, 你可以在使用host的时候指定用户

欢迎给我issue和PR

TODO

  • 因为很多人会用到跳板机,我想加入跳板机的用法, 看起来就像直接在本地操作远程一样 # 更新2013-12-03 完成
  • 设置输出是否隐藏,目前保留就是为了让我看到执行的过程
  • 设置支持多测试环境一起部署,或者说让生成环境的部署一样方便

前言

以前一直使用gentoo+gnome的方法使用,说实话,gentoo做个人桌面是在很一般, 尤其是跑起来 firefox, 虽然依然性能要比其他linux桌面发行版要好,可是我自己清楚. 尤其是gnome的很多功能对我个人完全没有用处,还浪费了内存. 最近神奇的同事@刘喆告诉我原来还有个神奇的 stumpwm, 基于common lisp的桌面管理. 我想很多人听过awesome,但是是lua的版本. 我当然希望用我熟悉的语言.这是一篇安装配置过程的博客

安装遇到了一些问题

本来一般的软件都是包管理emerge stumpwm 就好啦,但是我安装后依然找不到stumpwm, 在手动编译stumpwm的过程发现很多奇葩报错. 所以连sbcl也编译了一个

安装过程

# 安装sbcl到/usr/local
wget http://downloads.sourceforge.net/project/sbcl/sbcl/1.1.13/sbcl-1.1.13-source.tar.bz2
tar jxvf sbcl-1.1.13-source.tar.bz2
cd sbcl-1.1.13
sh make.sh
unset SBCL_HOME
unset INSTALL_ROOT
sudo sh install.sh

#使用commonlisp的包管理工具quicklisp.lisp

curl -O http://beta.quicklisp.org/quicklisp.lisp
sbcl --load quicklisp.lisp
(quicklisp-quickstart:install)
(ql:add-to-init-file)
# 还需要以下2个依赖
(ql:quickload "clx")
(ql:quickload "cl-ppcre")

# 可选安装swank
(ql:quickload "swank")
(ql:quickload "quicklisp-slime-helper")

# 退出
(quit)

# 下载stumpwm

git clone git://git.savannah.nongnu.org/stumpwm.git
cd stumpwm && ./autogen.sh && ./configure && make
PS: 这个make时候就算报错,也没有太大关系

#生成2进制文件 再次进入sbcl交互模式

(asdf:oos 'asdf:load-op :stumpwm)
(sb-ext:save-lisp-and-die "stumpwm" :executable t
    :toplevel #'(lambda () (stumpwm:stumpwm ":0")))

# 拷贝文件到PATH

sudo cp -rp stumpwm /usr/bin/

# 设置启动 注意是你的普通用户

$ echo "exec stumpwm" >> ~/.xinitrc

# 这样你就可以使用startx启动啦

stumpwm的命令都是ctrl+t 开头的

前言

自从发了上次的文章使用celery之深入celery配置, 有一些网友再问我怎么让celery跑起来. 其实说来也是,celery在新手眼里真的是比较重量级,不好懂,今天先让他跑起来吧 本文大部分代码和使用方法都可以在celery官网看到

我想要的效果

我想实现一个定时任务, 每3个小时的12分启动,假如是定时任务大概是这样的:

12 */3 * * * python /where/is/the/path/that.py

选择MQ

使用消息队列其实就是为了给任务一个时序,保证任务消息不丢失,想想你的一个任务是关乎公司核心业务,犹豫某种原因失败或者丢失怎么办? celery就需要这个消息的存储,我这里还是选择rabbitmq mongodb,redis都无所谓 只是存储的位置的问题. 选择其他的工具没有远程控制和监控

写法就是:

BROKER_URL = 'amqp://myuser:mypassword@localhost:5672/vhost'

其中可以这样解析

amqp://user:password@hostname:port/vhost

vhost是命名空间,就像网站的子域名,在这里由于权限控制我们需要先创建账号和密码

$ rabbitmqctl add_user myuser mypassword
$ rabbitmqctl add_vhost myvhost
$ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

编写tasks.py脚本

from celery import Celery

app = Celery('tasks', broker='amqp://myuser:mypassword@localhost:5672/vhost')

@app.task
def add(x, y):
    return x + y

#### 简单的使用

$celery -A tasks worker --loglevel=debug

-A指定的就是任务的程序 tasks.py worker表示他是一个执行任务角色. 后面的记录日志类型,默认是info

这个时候,你可以在当前目录下使用python交互模式生成一个任务

>>> from tasks import add
>>> add.delay(4, 4)

这个时候可以看见上面的日志里面多了一些消息,然后里面多了这个任务的信息,比如下面这样:

[2013-11-24 17:11:59,369: INFO/MainProcess] Received task: tasks.add[f27994b0-3628-43a1-b136-540a360e3d64]
[2013-11-24 17:11:59,371: INFO/MainProcess] Task tasks.add[f27994b0-3628-43a1-b136-540a360e3d64] succeeded in 0.00102571400021s: 8

可以看见你的任务被执行了

假如我使用python的包, 就像一个应用,让代码结构化一些

$tree proj
proj
├── __init__.py
├── celery.py
└── tasks.py
$cat proj/celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
              broker='amqp://myuser:mypassword@localhost:5672/vhost',
              backend='amqp://',
              include=['proj.tasks'])
app.conf.update(CELERY_TASK_RESULT_EXPIRES=3600,)
if __name__ == '__main__':
    app.start()

上面的broker就是消息存储的地址 backend是存储任务执行情况的,比如正在执行,执行失败, 已经执行结果. include表示执行的任务的代码都放在哪个程序里面,比如这里的proj.tasks就是proj/tasks.py

$cat proj/tasks.py
from __future__ import absolute_import

from proj.celery import app


@app.task
def add(x, y):
    return x + y

其中的app.task是一个装饰器, 你可以在tasks.py里面加很多函数,但是celery只会找带这个装饰器的函数当成一种任务去执行 你可以有多个这样的脚本,只要在上面的celery.py的include的列表中指定

好吧 我们可以这样启动

$celery worker --app=proj -l info

proj 就是我们刚才应用的项目目录

给我们的项目任务放到特定的队列

可能你有很多的任务,但是你希望某些机器跑某些任务, 你可以希望有些任务优先级比较高,而不希望 先进先出的等待. 那么需要引入一个队列的问题. 也就是说在你的broker的消息存储里面有一些队列,他们并行运行,但是worker只从对应 的队列里面取任务.

我们要修改配置

$cat proj/celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
              broker='amqp://myuser:mypassword@localhost:5672/vhost',
              backend='amqp://',
              include=['proj.tasks'])
app.conf.update(
    CELERY_ROUTES = {
            'proj.tasks.add': {'queue': 'hipri'},
                },
                )
if __name__ == '__main__':
    app.start()
celery -A proj worker -Q hipri #这个worker只处理hipri这个队列的任务

你会发现add这个函数任务被放在一个叫做’hipri’的队列里面,想要执行那么也需要改:

from proj.tasks import add
add.apply_async((2, 2), queue='hipri')

使用beat自动调度

想想吧. 目前还是交互模式去手动执行, 我们要是想crontab的定时生成和执行,那么就是celery beat干的事情

from __future__ import absolute_import

from datetime import timedelta
from celery import Celery

app = Celery('proj',
             broker='amqp://myuser:mypassword@localhost:5672/vhost',
             backend='amqp://',
              include=['proj.tasks'])

app.conf.update(
    CELERY_ROUTES = {
        'proj.tasks.add': {'queue': 'hipri'},
    },

    CELERYBEAT_SCHEDULE = {
        "add": {
                "task": "proj.tasks.add",
                "schedule": timedelta(seconds=10),
                "args": (16, 16)
                }, },
                )

if __name__ == '__main__':
    app.start()

注意发现了一个CELERYBEAT_SCHEDULE,里面的调度其实就是表示10秒生成一次,worker启动方法一样, 这里启动beat,他就是按时生成任务发到MQ里面,让worker取走去执行

celery -A proj beat

其实也可以在worker命令中加-B

celery -A proj worker -B -Q hipri -l debug

刚才的CELERYBEAT_SCHEDULE也可以使用crontab的风格,比如我说的没3小时的12分就可以这样:

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
        "add": {
                "task": "tasks.add",
                "schedule": crontab(hour="*/3", minute=12),
                "args": (16, 16),
                },
            }