专栏名称: 高效运维
高效运维公众号由萧田国及朋友们维护,经常发布各种广为传播的优秀原创技术文章,关注运维转型,陪伴您的运维职业生涯,一起愉快滴发展。
目录
相关文章推荐
51好读  ›  专栏  ›  高效运维

商业银行数据库锁阻塞情况分析方法探索

高效运维  · 公众号  ·  · 2023-12-13 07:10

正文

请到「今天看啥」查看全文


作者:中国工商银行软件开发中心

一、问题背景

对于商业银行来说,通常会在业务层面针对交易类场景会设置一个超时时间,比如 8s,少部分应用甚至会将超时时间设置为 4s。在某些批量调度中,若存在未经优化的 SQL 导致大事务或执行 DDL 操作的情况下,很容易持有记录级别甚至系统级别排它锁,导致那一时刻的联机交易被阻塞,从而引发交易成功率告警。

虽然绝大部分已建成的IT运维平台会提供包括 MDL 元数据锁等指标的监控,以及 MySQL 行锁等待超时参数等配置,但由于实际配置值均超过应用配置的交易超时时间,导致当应用频繁出现大量交易超时的问题时,数据库层面缺少相应的监控告警或事后排查手段。

二、现状说明

目前中心提供的监控和治理手段包括如下:

  • MDL元数据锁监控,阈值为15s;
  • MySQL innodb_lock_wait_timeout 参数(一个Innodb事务等待行锁释放的最长时间,单位秒),值为10s;
  • 大事务查杀机制
  • 慢 SQL 自动查杀机制

另外,中心自研的 MyAWR 工具,可以提供分钟级别的MySQL实例运行情况信息。

三、问题分析及着手点

对于目前情况,可以暂时不考虑增加额外的锁阻塞实时监控方面的工作,理由如下:

  1. 由于已存在元数据锁监控和行锁等待超时,时间更短的锁阻塞告警,本身不足够由人为介入处理,而自动处理的前提条件也存在不确定性

  • 自动处理是否准确,比如查杀的阻塞源能否 kill

  • 行锁等待超时会将后发起的请求锁的 session 自动结束,一般会触发应用端重试, 逻辑上也符合处理流程

  1. 需要额外的更细粒度的信息采集和分析,至少到“秒”级别,可能存在对生产环境性能的影响

鉴于此,本文考虑将事后分析作为主要目标,在发生“大量交易超时”问题时,可以有系统化的方法或工具,排查出产生锁并 造成阻塞 的主要源头。
首先我们要清楚,OLTP 系统的数据库无时无刻都在产生大量的锁,但并不是所有的锁都会造成阻塞,只有当存在一定并发,并且有不合适或者不规范的加锁操作,或者确实存在非常热的资源争抢时,才会造成阻塞。而锁的持有和阻塞的发生,在极端情况下会产生严重的雪崩效应,可能会导致数据库连接池满、服务器CPU升高(打满)等情况,最终导致数据库服务无法响应。当然,在目前已有的监控告警和运行保障机制下,达到锁阻塞造成雪崩情况的可能性较低。
所以目前的主要目标是,通过历史运行记录,快速分析出造成阻塞的主要SQL,以及该SQL的发起源。

四、分析方法

以下均以 MySQL 为例,描述分析所需要的原始数据、加工方法等处理步骤,其它数据库如GaussDB、PG等均可类比。

1.采集阻塞信息

基于 MyAWR 工具提供的锁等待记录表:

-- myawr_db_innodb_lock_waits definition

CREATE TABLE `myawr_db_innodb_lock_waits` (
 `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
 `snap_id` int(11) NOT NULL,
 `wait_started` varchar(19) COLLATE utf8mb4_bin DEFAULT NULL,
 `wait_age` varchar(8) COLLATE utf8mb4_bin DEFAULT NULL,
 `wait_age_secs` bigint(21) DEFAULT NULL,
 `locked_table` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
 `locked_index` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
 `locked_type` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL,
 `waiting_trx_id` varchar(18) COLLATE utf8mb4_bin DEFAULT NULL,
 `waiting_trx_started` varchar(19) COLLATE utf8mb4_bin DEFAULT NULL,
 `waiting_trx_age` varchar(8) COLLATE utf8mb4_bin DEFAULT NULL,
 `waiting_trx_rows_locked` bigint(21) unsigned DEFAULT NULL,
 `waiting_trx_rows_modified` bigint(21) unsigned DEFAULT NULL,
 `waiting_pid` bigint(21) unsigned DEFAULT NULL,
 `waiting_query` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
 `waiting_lock_id` varchar(81) COLLATE utf8mb4_bin DEFAULT NULL,
 `waiting_lock_mode` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_trx_id` varchar(18) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_pid` bigint(21) unsigned DEFAULT NULL,
 `blocking_query` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_lock_id` varchar(81) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_lock_mode` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_trx_started` varchar(19) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_trx_age` varchar(8) COLLATE utf8mb4_bin DEFAULT NULL,
 `blocking_trx_rows_locked` bigint(21) unsigned DEFAULT NULL,
 `blocking_trx_rows_modified` bigint(21) unsigned DEFAULT NULL,
 `sql_kill_blocking_query` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL,
 `sql_kill_blocking_connection` varchar(26) COLLATE utf8mb4_bin DEFAULT NULL,
 `m` int(2) NOT NULL,
  PRIMARY KEY (`id`,`m`),
  KEY `idx_db_innodb_lock_waits` (`snap_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26448 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

以及相应的采集语句(经测试,MySQL5.7和8.0版本均支持)

select
  DATE_FORMAT(`wait_started`, '%Y-%m-%d %H:%i:%s'),
  TIME_FORMAT(`wait_age`, '%H:%i:%s'),
  `wait_age_secs`,
  `locked_table`,
  `locked_index`,
  `locked_type`,
  `waiting_trx_id`,
  DATE_FORMAT(`waiting_trx_started`, '%Y-%m-%d %H:%i:%s'),
  TIME_FORMAT(`waiting_trx_age`, '%H:%i:%s'),
  `waiting_trx_rows_locked`,
  `waiting_trx_rows_modified`,
  `waiting_pid`,
  `waiting_query`,
  `waiting_lock_id`,
  `waiting_lock_mode`,
  `blocking_trx_id`,
  `blocking_pid`,
  IFNULL(`blocking_query`, ''),
  `blocking_lock_id`,
  `blocking_lock_mode`,
  DATE_FORMAT(`blocking_trx_started`, '%Y-%m-%d %H:%i:%s'),
  TIME_FORMAT(`blocking_trx_age`, '%H:%i:%s'),
  `blocking_trx_rows_locked`,
  `blocking_trx_rows_modified`,
  `sql_kill_blocking_query`,
  `sql_kill_blocking_connection`
from
    sys.x$innodb_lock_waits
limit 500
需要注意的是,目前工具自动采集频率为1分钟,对于秒级的阻塞分析,其精确性会存在一定影响。另外采集语句中的 limit 子句进行了行数限制,若当前系统锁等待事件非常严重,等待信息可能会有所遗漏,可适当进行调整。
2.SQL语句反参数化处理

运行状态采集的 waiting_query blocking_query 均为实际运行时SQL语句,对于相同结构,但是不同参数值的语句,需要进行反参数化处理,比如下方两条 SQL

UPDATE warehouse SET w_ytd = w_ytd + 1876.54 WHERE w_id = 1 
UPDATE warehouse SET w_ytd = w_ytd + 3384.41 WHERE w_id = 3

我们认为是相同的 SQL,希望将其处理为以下格式:

UPDATE warehouse SET w_ytd = w_ytd + ? WHERE w_id = ?

由于并不需要完整的 queryrewriter 实现,故可以通过一个简单的方法仅对字符串和数字进行替换(下方示例为 go 实现)

func formatSQL(sql string) string {
    sarr := strings.Split(sql, "'")
    for i := 1; i         sarr[i] = "?"
    }
    aa := strings.Join(sarr, "'")
    re := regexp.MustCompile(`\b\d+(\.\d+)?\b`)
    formattedSQL := re.ReplaceAllString(aa, "?")
    return formattedSQL
}

改写后的结果如下图:

3.表阻塞情况分析

可以通过下方SQL查出指定时间段,被阻塞的表以及造成阻塞的语句,按采集的等待时间求和并倒序排序,以此可以粗略评估造成阻塞的主要原因。

select
    w.locked_table
    ,sum(w.wait_age_secs) as wait_sum
    ,w.blocking_query
from
    myawr_db_innodb_lock_waits w
where
    w.wait_started between '2023-08-22 16:41:00' and '2023-08-22 17:00:00'
group by
    w.locked_table,
    w.blocking_query
order by
    wait_sum desc
当然,也可以结合阻塞明细表中的其他字段,进行更多维度的分析。

需要注意的是,分析本身只能输出定性结论,而无法给出精确的定量结果,原因如下:
  • 本身采集语句就是一个“采样”,而采样本身是无法体现完整的信息,采集间隔约长,遗漏的信息就会越多;不过相应的,在较低的采样率下依然被采集到的记录,则可以说明依然是值得重点关注的。

  • 其次,单次采样中的 blocking_query 字段,也并不是100%造成阻塞的语句,因为锁的持有是按“事务”为周期的,而一个事务中可能存在多条语句,如果采样时刻造成阻塞的语句已经执行完,则抓到的会是后面正在执行的语句,如上图会有“COMMIT”等语句在测试实验中被抓到。
  • 再次, wait_age_secs 字段的简单求和,并不真实代表被阻塞的总时长,因为存在采样率的差异,比如测试实验为1s,如果一个行锁持有者存在大于1s,又同时阻塞其它多个session,那么接下来的多次采集都会有该行锁的等待信息,加总时并不会过滤并筛选该行锁的最后一条采集记录(也是因为采样率的原因,并不能抓到每个行锁最终释放时间)。

所以分析等待情况的结果,只是用于定性分析,排查该时间段造成阻塞情况最严重的表和语句。
4.获取主要的阻塞 SQL 后,通过历史 session 记录可以检索与之相关的ip等信息
基于 MyAWR 的历史 session 表,根据SQL前缀匹配查找user字段中记录的ip信息,定位应用节点
-- mytest.myawr_db_session definition

CREATE TABLE `myawr_db_session` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `snap_id` int(11) NOT NULL,
  `thd_id` bigint(20) unsigned DEFAULT NULL,
  `conn_id` bigint(20) unsigned DEFAULT NULL,
  `user` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL,
  `db` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL,
  `command` varchar(16) COLLATE utf8mb4_bin DEFAULT NULL,
  `state` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL,
  `time` bigint(20) DEFAULT NULL,
  `current_statement` longtext COLLATE utf8mb4_bin,
  `statement_latency` bigint(20) unsigned DEFAULT NULL,
  `progress` decimal(26,0) DEFAULT NULL,
  `lock_latency` bigint(20) unsigned DEFAULT NULL,
  `rows_examined` bigint(20) unsigned DEFAULT NULL,
  `rows_sent` bigint(20) unsigned DEFAULT NULL,
  `rows_affected` bigint(20) unsigned DEFAULT NULL,
  `tmp_tables` bigint(20) unsigned DEFAULT NULL,
  `tmp_disk_tables` bigint(20) unsigned DEFAULT NULL,
  `full_scan` varchar(3) COLLATE utf8mb4_bin DEFAULT NULL,
  `last_statement` longtext COLLATE utf8mb4_bin,
  `last_statement_latency` bigint(20) unsigned DEFAULT NULL,
  `current_memory` decimal(41,0) DEFAULT NULL,
  `last_wait` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL,
  `last_wait_latency` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  `source` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL,
  `trx_latency` bigint(20) unsigned DEFAULT NULL,
  `trx_state` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  `trx_autocommit` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL,
  `pid` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
  `program_name` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL,
  `m` int(2) NOT NULL,
  PRIMARY KEY (`id`,`m`),
  KEY `idx_db_session` (`snap_id`)
) ENGINE=InnoDB AUTO_INCREMENT=6317 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
;

五、后记

本文仅为定位到由锁造成阻塞导致业务影响时,在事后进行问题分析的方法讨论。至于事中的应急处理,各公司可能会有不同的手段,事中处理更关注的是寻找阻塞源并查杀,而事后分析更关注应用代码中的需要优化的目标,比如热点表和 SQL 语句等。
本文为锁阻塞事件的事后分析提供了一种思路,并提供了部分方法的实现,同样的,该方法也可以拓展到其它数据库产品的分析中,希望能对读者有所帮助。

附:测试用例

以下为 Go 语言的部分方法实现记录,以供参考。

var (
    snap_time    string
    uptime       string
    uptime_flush string
)

type LockWaits struct {
    wait_started                 string
    wait_age                     string
    wait_age_secs                int64
    locked_table                 string
    locked_index                 string
    locked_type                  string
    waiting_trx_id               string
    waiting_trx_started          string
    waiting_trx_age              string
    waiting_trx_rows_locked      int64
    waiting_trx_rows_modified    int64
    waiting_pid                  int64
    waiting_query                string
    waiting_lock_id              string
    waiting_lock_mode            string
    blocking_trx_id              string
    blocking_pid                 int64
    blocking_query               string
    blocking_lock_id             string
    blocking_lock_mode           string
    blocking_trx_started         string
    blocking_trx_age             string
    blocking_trx_rows_locked     int64
    blocking_trx_rows_modified   int64
    sql_kill_blocking_query      string
    sql_kill_blocking_connection string
}

func GetSnapshot() (snap_id, m int, err error) {
    var sql string
    currentTime := time.Now()
    snap_time = currentTime.Format("2006-01-02 15:04:05")
    m = int(currentTime.Month())
    var dbClinent = mysql.InitConnect()
    sql = "SELECT date_format(date_sub(now(),interval variable_value second),'%Y-%m-%d %T') as t FROM performance_schema.global_status where variable_name in ('Uptime','Uptime_since_flush_status') order by variable_name"
    list, err := mysql.QueryAll(dbClinent, sql)
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to query data: %s", err))
        return 0, 0, fmt.Errorf("failed to query data: %s", err)
    }

    uptime = list[0]["t"].(string)
    uptime_flush = list[1]["t"].(string)

    sql = fmt.Sprintf("insert into myawr_snapshot(snap_time,uptime,uptime_flush,m) values('%s', '%s', '%s', %d)", snap_time, uptime, uptime_flush, m)
    // wlog.Info(sql)
    err = mysql.Execute(dbClinent, sql)
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to execute statement: %s", err))
        return 0, 0, fmt.Errorf("failed to execute statement: %s", err)
    }
    sql = "SELECT LAST_INSERT_ID()"
    last_insert_id, err := mysql.QueryOne(dbClinent, sql)
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to query data: %s", err))
        return 0, 0, fmt.Errorf("failed to query data: %s", err)
    }
    snap_id, _ = strconv.Atoi(last_insert_id)
    dbClinent.Close()
    return snap_id, m, nil
}

func formatSQL(sql string) string {
    sarr := strings.Split(sql, "'")
    for i := 1; i         sarr[i] = "?"
    }
    aa := strings.Join(sarr, "'")
    re := regexp.MustCompile(`\b\d+(\.\d+)?\b`)
    formattedSQL := re.ReplaceAllString(aa, "?")
    return formattedSQL
}

func FetchLockWaitsData(ip, port, user, pass string) ([]LockWaits, error) {
    mydb, err := mysql.Connect(ip, port, user, pass, "performance_schema")
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to connect to source database on %s:%s, %s", ip, port, err))
        return nil, fmt.Errorf("failed to connect to source database: %s", err)
    }

    rows, err := mydb.Query("SELECT DATE_FORMAT(`wait_started`, '%Y-%m-%d %H:%i:%s'), TIME_FORMAT(`wait_age`, '%H:%i:%s'),  `wait_age_secs`,    `locked_table`, `locked_index`, `locked_type`,  `waiting_trx_id`,   DATE_FORMAT(`waiting_trx_started`, '%Y-%m-%d %H:%i:%s'),    TIME_FORMAT(`waiting_trx_age`, '%H:%i:%s'), `waiting_trx_rows_locked`,  `waiting_trx_rows_modified`,    `waiting_pid`,  `waiting_query`,    `waiting_lock_id`,  `waiting_lock_mode`,    `blocking_trx_id`,  `blocking_pid`, IFNULL(`blocking_query`, ''),   `blocking_lock_id`, `blocking_lock_mode`,   DATE_FORMAT(`blocking_trx_started`, '%Y-%m-%d %H:%i:%s'),   TIME_FORMAT(`blocking_trx_age`, '%H:%i:%s'),    `blocking_trx_rows_locked`, `blocking_trx_rows_modified`,   `sql_kill_blocking_query`,  `sql_kill_blocking_connection` FROM sys.x$innodb_lock_waits LIMIT 500")
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to fetch data from source table: %s", err))
        return nil, fmt.Errorf("failed to fetch data from source table: %s", err)
    }
    defer rows.Close()

    var data []LockWaits
    for rows.Next() {
        var rowData LockWaits
        err := rows.Scan(&rowData.wait_started, &rowData.wait_age, &rowData.wait_age_secs, &rowData.locked_table, &rowData.locked_index, &rowData.locked_type, &rowData.waiting_trx_id, &rowData.waiting_trx_started, &rowData.waiting_trx_age, &rowData.waiting_trx_rows_locked, &rowData.waiting_trx_rows_modified, &rowData.waiting_pid, &rowData.waiting_query, &rowData.waiting_lock_id, &rowData.waiting_lock_mode, &rowData.blocking_trx_id, &rowData.blocking_pid, &rowData.blocking_query, &rowData.blocking_lock_id, &rowData.blocking_lock_mode, &rowData.blocking_trx_started, &rowData.blocking_trx_age, &rowData.blocking_trx_rows_locked, &rowData.blocking_trx_rows_modified, &rowData.sql_kill_blocking_query, &rowData.sql_kill_blocking_connection)
        if err != nil {
            wlog.Error(fmt.Sprintf("failed to scan row data: %s", err))
            return nil, fmt.Errorf("failed to scan row data: %s", err)
        }
        defer mydb.Close()
        data = append(data, rowData)
    }
    mydb.Close()
    return data, nil
}

func InsertLockWaitsData(data []LockWaits) error {
    var dbClinent = mysql.InitConnect()
    tableName := "myawr_db_innodb_lock_waits"
    stmt, err := dbClinent.Prepare(fmt.Sprintf("INSERT INTO %s "+
        "(snap_id, wait_started, wait_age, wait_age_secs, locked_table, locked_index, locked_type, waiting_trx_id, waiting_trx_started, waiting_trx_age, waiting_trx_rows_locked, waiting_trx_rows_modified, waiting_pid, waiting_query, waiting_lock_id, waiting_lock_mode, blocking_trx_id, blocking_pid, blocking_query, blocking_lock_id, blocking_lock_mode, blocking_trx_started, blocking_trx_age, blocking_trx_rows_locked, blocking_trx_rows_modified, sql_kill_blocking_query, sql_kill_blocking_connection, m) "+
        "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", tableName))
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to prepare INSERT statement: %s", err))
        return fmt.Errorf("failed to prepare INSERT statement: %s", err)
    }
    defer stmt.Close()

    snap_id, m, err := GetSnapshot()
    if err != nil {
        wlog.Error(fmt.Sprintf("failed to get snapshot: %s", err))
        return fmt.Errorf("failed to get snapshot: %s", err)
    }

    for _, rowData := range data {
        formatted := formatSQL(rowData.blocking_query)
        _, err := stmt.Exec(snap_id, rowData.wait_started, rowData.wait_age, rowData.wait_age_secs, rowData.locked_table, rowData.locked_index, rowData.locked_type, rowData.waiting_trx_id, rowData.waiting_trx_started, rowData.waiting_trx_age, rowData.waiting_trx_rows_locked, rowData.waiting_trx_rows_modified, rowData.waiting_pid, rowData.waiting_query, rowData.waiting_lock_id, rowData.waiting_lock_mode, rowData.blocking_trx_id, rowData.blocking_pid, formatted, rowData.blocking_lock_id, rowData.blocking_lock_mode, rowData.blocking_trx_started, rowData.blocking_trx_age, rowData.blocking_trx_rows_locked, rowData.blocking_trx_rows_modified, rowData.sql_kill_blocking_query, rowData.sql_kill_blocking_connection, m)
        if err != nil {
            wlog.Error(fmt.Sprintf("failed to insert data: %s", err))
            return fmt.Errorf("failed to insert data: %s", err)
        }
    }
    dbClinent.Close()
    return nil
}

func main() {
    // mini myawr
    ticker := time.Tick(1 * time.Second)
    timer := time.After(300 * time.Second)
    for {
        select {
        case             fmt.Println("任务执行")
            lockwaitsdata, _ := mysql.FetchLockWaitsData(conf.Option["mysql_host"], conf.Option["mysql_port"], conf.Option["mysql_user"], conf.Option["mysql_password"])
            _ = mysql.InsertLockWaitsData(lockwaitsdata)
        case             fmt.Println("程序结束")
            return
        }
    }
}

近期好文:

高效部署:基于 Prometheus 的云原生监控,用完真香?

“高效运维”公众号诚邀广大技术人员投稿

投稿邮箱:[email protected],或添加联系人微信:greatops1118。







请到「今天看啥」查看全文