FunTester 基于时间戳的日志回放引擎

random 测试交流3120字数 5962阅读模式

作者

FunTester

FunTester

FunTester,Have Fun ~ Tester !

之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。

查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。文章源自玩技e族-https://www.playezu.com/239674.html

解决思路

目前流量回放集中于 HTTP 流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:文章源自玩技e族-https://www.playezu.com/239674.html

  1. 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含 HTTP 请求的组成部分。
  2. 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作
  3. 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。

其中最最核心的应该就是队列的选择,这里我用看 java 的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:文章源自玩技e族-https://www.playezu.com/239674.html

  • Java&Go 高性能队列之 LinkedBlockingQueue 性能测试 2022-01-10
  • Java&Go 高性能队列之 Disruptor 性能测试 2022-02-14
  • Java&Go 高性能队列之 channel 性能测试 2022-02-17

本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。文章源自玩技e族-https://www.playezu.com/239674.html

然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列 DelayQueue 性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟 10s 撤单性能测试。文章源自玩技e族-https://www.playezu.com/239674.html

实现

总体来说实现起来思路比较清晰,我分成三部分分享。文章源自玩技e族-https://www.playezu.com/239674.html

属性定义

  1. 我首先定义了一个com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志对象,用于存储每一个请求日志
  2. 然后定义一个com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是 OK 的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。
  3. 再定义com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs中取,clone之后丢到队列中;消费者从队列中取对象,丢给线程池。
  4. 定义com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,就是把流量对象包装成HttpRequestBase对象然后发送出去

生产者

  1. 确定使用异步线程完成,使用Java 自定义异步功能实践。
  2. 根据com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum参数来控制。
  3. 多线程取com.funtester.frame.execute.ReplayConcurrent#logs对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。
  4. 使用了com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH控制队列的长度。貌似没找到限制延迟队列长度的 API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠 1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免 1s 中内队列变成空。回放引擎设计 50 万 QPS,所以我就先设置了 80 万的最大长度。后续可以根据实际情况调整。

消费者

  1. 依旧使用异步,生产者
  2. 使用 API 时java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免阻塞导致线程无法终止。
  3. 引入com.funtester.frame.execute.ReplayConcurrent#getMultiple控制流量回放的倍数。
  4. 使用com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。
  5. 使用com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志对象。

代码如下:文章源自玩技e族-https://www.playezu.com/239674.html

package com.funtester.frame.execute
import com.funtester.base.bean.AbstractBean
import com.funtester.frame.SourceCode
import com.funtester.utils.LogUtil
import com.funtester.utils.RWUtil
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder
/**
* 回放功能执行类*/
class ReplayConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);
static ThreadPoolExecutor executor
static boolean key = true
static int MAX_LENGTH = 800000
int threadNum = 2
String name
String fileName
int multiple
Closure handle
List<ReplayLog> logs
DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()
LongAdder total = new LongAdder()
ReplayConcurrent(String name, String fileName, int multiple, Closure handle) {
this.name = name
this.fileName = fileName
this.multiple = multiple
this.handle = handle
}
void start() {
if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R")
time({
RWUtil.readFile(fileName, {
def delay = new ReplayLog(it)
if (delay.getTimestamp() != 0) logDelayQueue.add(delay)
})
}, 1, "读取日志$fileName")
logs = logDelayQueue.toList()
def timestamp = logs.get(0).getTimestamp()
logDelayQueue.clear()
AtomicInteger index = new AtomicInteger()
AtomicInteger size = new AtomicInteger()
def LogSize = logs.size()
AtomicInteger diff = new AtomicInteger()
threadNum.times {
fun {
while (key) {
if (index.get() % LogSize == 0) diff.set(getMark() - timestamp)
if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size())
if (size.get() > MAX_LENGTH) {
sleep(1.0)
size.set(logDelayQueue.size())
}
def replay = logs.get(index.getAndIncrement() % LogSize)
logDelayQueue.add(replay.clone(replay.timestamp + diff.get()))
}
}
}
threadNum.times {
fun {
while (key) {
def poll = logDelayQueue.poll(1, TimeUnit.SECONDS)
if (poll != null) {
executor.execute {
multiple.times {
handle(poll.getUrl())
total.add(1)
}
}
}
}
}
}
fun {
while (key) {
sleep(COUNT_INTERVAL as double)
int real = total.sumThenReset() / COUNT_INTERVAL as int
def active = executor.getActiveCount()
def count = active == 0 ? 1 : active
logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int)
}
}
}
/**
* 中止
* @return
*/
def stop() {
key = false
executor.shutdown()
logger.info("replay压测关闭了!")
}
/**
* 日志对象*/
static class ReplayLog extends AbstractBean implements Delayed {
int timestamp
String url
ReplayLog(String logLine) {
def log = LogUtil.getLog(logLine)
this.url = log.getUrl()
this.timestamp = log.getTime()
}
ReplayLog(int timestamp, String url) {
this.timestamp = timestamp
this.url = url
}
@Override
long getDelay(TimeUnit unit) {
return this.timestamp - getMark()
}
@Override
int compareTo(Delayed o) {
return this.timestamp - o.timestamp
}
protected Object clone(int timestamp) {
return new ReplayLog(timestamp, this.url)
}
}
}

自测

下面是我的测试用例:文章源自玩技e族-https://www.playezu.com/239674.html

package com.okcoin.hickwall.presses
import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrent
import com.okcoin.hickwall.presses.funtester.httpclient.FunHttp
class RplayT extends FunHttp {
static String HOST = "http://localhost:12345"
public static void main(String[] args) {
def fileName = "api.log"
new ReplayConcurrent("测试回放功能", fileName, 1, {String url ->
getHttpResponse(getHttpGet(HOST + url))
}).start()
}
}

测试结果如下:文章源自玩技e族-https://www.playezu.com/239674.html

22:45:43.510 main
###### #     #  #    # ####### ######  #####  ####### ######  #####
#      #     #  ##   #    #    #       #         #    #       #    #
####   #     #  # #  #    #    ####    #####     #    ####    #####
#      #     #  #  # #    #    #            #    #    #       #   #
#       #####   #    #    #    ######  #####     #    ######  #    #
10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:23162
10:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:6095
10:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855
10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:4099
10:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:8806
10:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:28426
10:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:5601
10:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392
  • 接口功能测试专题
  • 性能测试专题
  • Groovy 专题
  • Java、Groovy、Go、Python
  • 单测&白盒
  • FunTester 社群风采
  • 测试理论鸡汤
  • FunTester 视频专题
  • 案例分享:方案、BUG、爬虫
  • UI 自动化专题
  • 测试工具专题
文章源自玩技e族-https://www.playezu.com/239674.html
玩技站长微信
添加好友自动发送入群邀请
weinxin
rainbow-shownow
玩技官方公众号
官方微信公众号
weinxin
PLAYEZU
 
  • 版权提示:本站仅供存储任何法律责任由作者承担▷诈骗举报◁▷新闻不符◁▷我要投稿◁
    风险通知:非原创文章均为网络投稿真实性无法判断,侵权联系2523030730
    免责声明:内容来自用户上传发布或新闻客户端自媒体,切勿!切勿!切勿!添加联系方式以免受骗。
  • 原创转载:https://www.playezu.com/239674.html
    转载说明: 点我前往阅读>>>
评论  3  访客  3
    • Thirty-Thirty
      Thirty-Thirty 9

      没关系,如果后期有了不涉及公司业务的新思路,再补充分享吧,这是比较有意义的。

      • FunTester
        FunTester 9

        感谢。日志清洗涉及 2 个步骤:1.把不符合要求的日志过滤掉(比如格式错误,压测黑名单等);2.提取日志中有用信息(包括 url,header 部分、时间戳、来源信息等),由于日志格式是架构定义的,所以有一部分信息无用。
        这两部分都涉及公司业务相关的,不方便分享。

        • Thirty-Thirty
          Thirty-Thirty 9

          对于日志回放,难得有如此详细的分析和说明,楼主用心了。

          楼主先给出了解决思路,然后按照思路给出了实现方法和过程,便于大家理解。

          问个问题啊,解决思路里第一步是日志清洗,文章里好像没详述具体实现,这块能具体描述下吗?

        匿名

        发表评论

        匿名网友
        确定