pykafka 写消息时速度极慢,只有 10 mgs / s 正常么? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
996635
V2EX    Kafka

pykafka 写消息时速度极慢,只有 10 mgs / s 正常么?

  •  
  •   996635 2016 年 9 月 7 日 8230 次点击
    这是一个创建于 3421 天前的主题,其中的信息可能已经有所发展或是发生改变。
    # coding:utf=8 from pykafka.client import KafkaClient import logging import json import time logging.basicConfig(level= logging.WARNING) produce_logger = logging.getLogge('prodrcer') def kafka(use_rdkafka=False): client = KafkaClient('192.168.109.58:9092,192.168.109.70:9092,192.168.109.91:9092') produce_start = time.time() topic = client.topics['meteor_spider_article_dev'] # producer = topic.get_producer(sync=True, use_rdkafka=use_rdkafka) msg_body = { 'article_id': 1, "title": "标题", "subtitle": "副标题", } msg = json.dumps(msg_body) with topic.get_sync_producer() as producer: for i in range(0, 1000): producer.produce(msg) producer.stop() return time.time() - produce_start def calculate_thoughput(timing, n_messages=1000, msg_size=5956): print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing)) print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024))) print("{0:.2f} Msgs/s".format(n_messages / timing)) if __name__ == '__main__': calculate_thoughput(kafka()) 

    Processed 1000 messsages in 76.68 seconds 0.07 MB/s 13.04 Msgs/s

    这速度 怎么回事?

    第 1 条附言    2016 年 9 月 7 日
    问题解决了, 在未使用 use_rdkafka 时, 默认的 linger_ms 很大, 会导致未在 min_queued_messages 内的 去做 sleep
    5 条回复    2016-09-07 15:50:51 +08:00
    sylecn
        1
    sylecn  
       2016 年 9 月 7 日
    topic.get_sync_producer()

    虽然还没有用过 kafka ,但是这种压力测试应该都用 async 模式来发消息吧。如果用同步,起码开多线程一起发。
    要不然一个一个等反馈多慢啊。
    reAsOn
        2
    reAsOn  
       2016 年 9 月 7 日
    用过 kafka-python 的库,性能可以接受,需要用异步发送 + batch
    996635
        3
    996635  
    OP
       2016 年 9 月 7 日
    @sylecn
    @reAsOn

    我 python -m cProfile 运行了一下,发现居然会有 time.sleep 在调用栈里

    ```
    4064 0.002 0.000 0.002 0.000 {thread.get_ident}
    8 0.000 0.000 0.000 0.000 {thread.start_new_thread}
    6615 68.400 0.010 68.400 0.010 {time.sleep}
    8616 0.006 0.000 0.006 0.000 {time.time}

    ```
    est
        4
    est  
       2016 年 9 月 7 日
    kafka 至少有 3 个 py 库,各自实现都不同。需要仔细判别。
    tongle
        5
    tongle  
       2016 年 9 月 7 日
    Using the librdkafka extension 试试这个