作者:中国工商银行软件开发中心
一、问题背景
对于商业银行来说,通常会在业务层面针对交易类场景会设置一个超时时间,比如 8s,少部分应用甚至会将超时时间设置为 4s。在某些批量调度中,若存在未经优化的 SQL 导致大事务或执行 DDL 操作的情况下,很容易持有记录级别甚至系统级别排它锁,导致那一时刻的联机交易被阻塞,从而引发交易成功率告警。
虽然绝大部分已建成的IT运维平台会提供包括 MDL 元数据锁等指标的监控,以及 MySQL 行锁等待超时参数等配置,但由于实际配置值均超过应用配置的交易超时时间,导致当应用频繁出现大量交易超时的问题时,数据库层面缺少相应的监控告警或事后排查手段。
二、现状说明
目前中心提供的监控和治理手段包括如下:
MySQL
innodb_lock_wait_timeout
参数(一个Innodb事务等待行锁释放的最长时间,单位秒),值为10s;
另外,中心自研的 MyAWR 工具,可以提供分钟级别的MySQL实例运行情况信息。
三、问题分析及着手点
对于目前情况,可以暂时不考虑增加额外的锁阻塞实时监控方面的工作,理由如下:
由于已存在元数据锁监控和行锁等待超时,时间更短的锁阻塞告警,本身不足够由人为介入处理,而自动处理的前提条件也存在不确定性
需要额外的更细粒度的信息采集和分析,至少到“秒”级别,可能存在对生产环境性能的影响
鉴于此,本文考虑将事后分析作为主要目标,在发生“大量交易超时”问题时,可以有系统化的方法或工具,排查出产生锁并
造成阻塞
的主要源头。
首先我们要清楚,OLTP 系统的数据库无时无刻都在产生大量的锁,但并不是所有的锁都会造成阻塞,只有当存在一定并发,并且有不合适或者不规范的加锁操作,或者确实存在非常热的资源争抢时,才会造成阻塞。而锁的持有和阻塞的发生,在极端情况下会产生严重的雪崩效应,可能会导致数据库连接池满、服务器CPU升高(打满)等情况,最终导致数据库服务无法响应。当然,在目前已有的监控告警和运行保障机制下,达到锁阻塞造成雪崩情况的可能性较低。
所以目前的主要目标是,通过历史运行记录,快速分析出造成阻塞的主要SQL,以及该SQL的发起源。
四、分析方法
以下均以 MySQL 为例,描述分析所需要的原始数据、加工方法等处理步骤,其它数据库如GaussDB、PG等均可类比。
1.采集阻塞信息
基于 MyAWR 工具提供的锁等待记录表:
-- myawr_db_innodb_lock_waits definition CREATE TABLE `myawr_db_innodb_lock_waits` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `snap_id` int(11) NOT NULL, `wait_started` varchar(19) COLLATE utf8mb4_bin DEFAULT NULL, `wait_age` varchar(8) COLLATE utf8mb4_bin DEFAULT NULL, `wait_age_secs` bigint(21) DEFAULT NULL, `locked_table` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL, `locked_index` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL, `locked_type` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL, `waiting_trx_id` varchar(18) COLLATE utf8mb4_bin DEFAULT NULL, `waiting_trx_started` varchar(19) COLLATE utf8mb4_bin DEFAULT NULL, `waiting_trx_age` varchar(8) COLLATE utf8mb4_bin DEFAULT NULL, `waiting_trx_rows_locked` bigint(21) unsigned DEFAULT NULL, `waiting_trx_rows_modified` bigint(21) unsigned DEFAULT NULL, `waiting_pid` bigint(21) unsigned DEFAULT NULL, `waiting_query` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL, `waiting_lock_id` varchar(81) COLLATE utf8mb4_bin DEFAULT NULL, `waiting_lock_mode` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_trx_id` varchar(18) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_pid` bigint(21) unsigned DEFAULT NULL, `blocking_query` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_lock_id` varchar(81) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_lock_mode` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_trx_started` varchar(19) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_trx_age` varchar(8) COLLATE utf8mb4_bin DEFAULT NULL, `blocking_trx_rows_locked` bigint(21) unsigned DEFAULT NULL, `blocking_trx_rows_modified` bigint(21) unsigned DEFAULT NULL, `sql_kill_blocking_query` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL, `sql_kill_blocking_connection` varchar(26) COLLATE utf8mb4_bin DEFAULT NULL, `m` int(2) NOT NULL, PRIMARY KEY (`id`,`m`), KEY `idx_db_innodb_lock_waits` (`snap_id`) ) ENGINE=InnoDB AUTO_INCREMENT=26448 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
以及相应的采集语句(经测试,MySQL5.7和8.0版本均支持)
select DATE_FORMAT(`wait_started`, '%Y-%m-%d %H:%i:%s' ), TIME_FORMAT(`wait_age`, '%H:%i:%s' ), `wait_age_secs`, `locked_table`, `locked_index`, `locked_type`, `waiting_trx_id`, DATE_FORMAT(`waiting_trx_started`, '%Y-%m-%d %H:%i:%s' ), TIME_FORMAT(`waiting_trx_age`, '%H:%i:%s' ), `waiting_trx_rows_locked`, `waiting_trx_rows_modified`, `waiting_pid`, `waiting_query`, `waiting_lock_id`, `waiting_lock_mode`, `blocking_trx_id`, `blocking_pid`, IFNULL(`blocking_query`, '' ), `blocking_lock_id`, `blocking_lock_mode`, DATE_FORMAT(`blocking_trx_started`, '%Y-%m-%d %H:%i:%s' ), TIME_FORMAT(`blocking_trx_age`, '%H:%i:%s' ), `blocking_trx_rows_locked`, `blocking_trx_rows_modified`, `sql_kill_blocking_query`, `sql_kill_blocking_connection` from sys.x$innodb_lock_waits limit 500
需要注意的是,目前工具自动采集频率为1分钟,对于秒级的阻塞分析,其精确性会存在一定影响。另外采集语句中的 limit 子句进行了行数限制,若当前系统锁等待事件非常严重,等待信息可能会有所遗漏,可适当进行调整。
运行状态采集的
waiting_query
和
blocking_query
均为实际运行时SQL语句,对于相同结构,但是不同参数值的语句,需要进行反参数化处理,比如下方两条 SQL
UPDATE warehouse SET w_ytd = w_ytd + 1876.54 WHERE w_id = 1 UPDATE warehouse SET w_ytd = w_ytd + 3384.41 WHERE w_id = 3
我们认为是相同的 SQL,希望将其处理为以下格式:
UPDATE warehouse SET w_ytd = w_ytd + ? WHERE w_id = ?
由于并不需要完整的
queryrewriter
实现,故可以通过一个简单的方法仅对字符串和数字进行替换(下方示例为
go
实现)
func formatSQL(sql string) string { sarr := strings.Split(sql, "'" ) for i := 1; i sarr[i] = "?" } aa := strings.Join(sarr, "'" ) re := regexp.MustCompile(`\b\d+(\.\d+)?\b`) formattedSQL := re.ReplaceAllString(aa, "?" ) return formattedSQL }
改写后的结果如下图:
3.表阻塞情况分析
可以通过下方SQL查出指定时间段,被阻塞的表以及造成阻塞的语句,按采集的等待时间求和并倒序排序,以此可以粗略评估造成阻塞的主要原因。
select w.locked_table ,sum(w.wait_age_secs) as wait_sum ,w.blocking_query from myawr_db_innodb_lock_waits wwhere
w.wait_started between '2023-08-22 16:41:00' and '2023-08-22 17:00:00' group by w.locked_table, w.blocking_query order by wait_sum desc
当然,也可以结合阻塞明细表中的其他字段,进行更多维度的分析。
需要注意的是,分析本身只能输出定性结论,而无法给出精确的定量结果,原因如下:
本身采集语句就是一个“采样”,而采样本身是无法体现完整的信息,采集间隔约长,遗漏的信息就会越多;不过相应的,在较低的采样率下依然被采集到的记录,则可以说明依然是值得重点关注的。
其次,单次采样中的
blocking_query
字段,也并不是100%造成阻塞的语句,因为锁的持有是按“事务”为周期的,而一个事务中可能存在多条语句,如果采样时刻造成阻塞的语句已经执行完,则抓到的会是后面正在执行的语句,如上图会有“COMMIT”等语句在测试实验中被抓到。
再次,
wait_age_secs
字段的简单求和,并不真实代表被阻塞的总时长,因为存在采样率的差异,比如测试实验为1s,如果一个行锁持有者存在大于1s,又同时阻塞其它多个session,那么接下来的多次采集都会有该行锁的等待信息,加总时并不会过滤并筛选该行锁的最后一条采集记录(也是因为采样率的原因,并不能抓到每个行锁最终释放时间)。
所以分析等待情况的结果,只是用于定性分析,排查该时间段造成阻塞情况最严重的表和语句。
4.获取主要的阻塞 SQL 后,通过历史 session 记录可以检索与之相关的ip等信息
基于 MyAWR 的历史 session 表,根据SQL前缀匹配查找user字段中记录的ip信息,定位应用节点
-- mytest.myawr_db_session definition CREATE TABLE `myawr_db_session` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `snap_id` int(11) NOT NULL, `thd_id` bigint(20) unsigned DEFAULT NULL, `conn_id` bigint(20) unsigned DEFAULT NULL, `user` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL, `db` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL, `command ` varchar(16) COLLATE utf8mb4_bin DEFAULT NULL, `state` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL, `time` bigint(20) DEFAULT NULL, `current_statement` longtext COLLATE utf8mb4_bin, `statement_latency` bigint(20) unsigned DEFAULT NULL, `progress` decimal(26,0) DEFAULT NULL, `lock_latency` bigint(20) unsigned DEFAULT NULL, `rows_examined` bigint(20) unsigned DEFAULT NULL, `rows_sent` bigint(20) unsigned DEFAULT NULL, `rows_affected` bigint(20) unsigned DEFAULT NULL, `tmp_tables` bigint(20) unsigned DEFAULT NULL, `tmp_disk_tables` bigint(20) unsigned DEFAULT NULL, `full_scan` varchar(3) COLLATE utf8mb4_bin DEFAULT NULL, `last_statement` longtext COLLATE utf8mb4_bin, `last_statement_latency` bigint(20) unsigned DEFAULT NULL, `current_memory` decimal(41,0) DEFAULT NULL, `last_wait` varchar(128) COLLATE utf8mb4_bin DEFAULT NULL, `last_wait_latency` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `source ` varchar(64) COLLATE utf8mb4_bin DEFAULT NULL, `trx_latency` bigint(20) unsigned DEFAULT NULL, `trx_state` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `trx_autocommit` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `pid` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL, `program_name` varchar(1024) COLLATE utf8mb4_bin DEFAULT NULL, `m` int(2) NOT NULL, PRIMARY KEY (`id`,`m`), KEY `idx_db_session` (`snap_id`) ) ENGINE=InnoDB AUTO_INCREMENT=6317 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ;
五、后记
本文仅为定位到由锁造成阻塞导致业务影响时,在事后进行问题分析的方法讨论。至于事中的应急处理,各公司可能会有不同的手段,事中处理更关注的是寻找阻塞源并查杀,而事后分析更关注应用代码中的需要优化的目标,比如热点表和 SQL 语句等。
本文为锁阻塞事件的事后分析提供了一种思路,并提供了部分方法的实现,同样的,该方法也可以拓展到其它数据库产品的分析中,希望能对读者有所帮助。
附:测试用例
以下为 Go 语言的部分方法实现记录,以供参考。
var ( snap_time string uptime string uptime_flush string )type LockWaits struct { wait_started string wait_age string wait_age_secs int64 locked_table string locked_index string locked_type string waiting_trx_id string waiting_trx_started string waiting_trx_age string waiting_trx_rows_locked int64 waiting_trx_rows_modified int64 waiting_pid int64 waiting_query string waiting_lock_id string waiting_lock_mode string blocking_trx_id string blocking_pid int64 blocking_query string blocking_lock_id string blocking_lock_mode string blocking_trx_started string blocking_trx_age string blocking_trx_rows_locked int64 blocking_trx_rows_modified int64 sql_kill_blocking_query string sql_kill_blocking_connection string } func GetSnapshot() (snap_id, m int, err error) { var sql string currentTime := time.Now() snap_time = currentTime.Format("2006-01-02 15:04:05" ) m = int(currentTime.Month()) var dbClinent = mysql.InitConnect() sql = "SELECT date_format(date_sub(now(),interval variable_value second),'%Y-%m-%d %T') as t FROM performance_schema.global_status where variable_name in ('Uptime','Uptime_since_flush_status') order by variable_name" list, err := mysql.QueryAll(dbClinent, sql) if err != nil { wlog.Error(fmt.Sprintf("failed to query data: %s" , err)) return 0, 0, fmt.Errorf("failed to query data: %s" , err) } uptime = list[0]["t" ].(string) uptime_flush = list[1]["t" ].(string) sql = fmt.Sprintf("insert into myawr_snapshot(snap_time,uptime,uptime_flush,m) values('%s', '%s', '%s', %d)" , snap_time, uptime, uptime_flush, m) // wlog.Info(sql) err = mysql.Execute(dbClinent, sql) if err != nil { wlog.Error(fmt.Sprintf("failed to execute statement: %s" , err)) return 0, 0, fmt.Errorf("failed to execute statement: %s" , err) } sql = "SELECT LAST_INSERT_ID()" last_insert_id, err := mysql.QueryOne(dbClinent, sql) if err != nil { wlog.Error(fmt.Sprintf("failed to query data: %s" , err)) return 0, 0, fmt.Errorf("failed to query data: %s" , err) } snap_id, _ = strconv.Atoi(last_insert_id) dbClinent.Close() return snap_id, m, nil } func formatSQL(sql string) string { sarr := strings.Split(sql, "'" ) for i := 1; i sarr[i] = "?" } aa := strings.Join(sarr, "'" ) re := regexp.MustCompile(`\b\d+(\.\d+)?\b`) formattedSQL := re.ReplaceAllString(aa, "?" ) return formattedSQL } func FetchLockWaitsData(ip, port, user, pass string) ([]LockWaits, error) { mydb, err := mysql.Connect(ip, port, user, pass, "performance_schema" ) if err != nil { wlog.Error(fmt.Sprintf("failed to connect to source database on %s:%s, %s" , ip, port, err)) return nil, fmt.Errorf("failed to connect to source database: %s" , err) } rows, err := mydb.Query("SELECT DATE_FORMAT(`wait_started`, '%Y-%m-%d %H:%i:%s'), TIME_FORMAT(`wait_age`, '%H:%i:%s'), `wait_age_secs`, `locked_table`, `locked_index`, `locked_type`, `waiting_trx_id`, DATE_FORMAT(`waiting_trx_started`, '%Y-%m-%d %H:%i:%s'), TIME_FORMAT(`waiting_trx_age`, '%H:%i:%s'), `waiting_trx_rows_locked`, `waiting_trx_rows_modified`, `waiting_pid`, `waiting_query`, `waiting_lock_id`, `waiting_lock_mode`, `blocking_trx_id`, `blocking_pid`, IFNULL(`blocking_query`, ''), `blocking_lock_id`, `blocking_lock_mode`, DATE_FORMAT(`blocking_trx_started`, '%Y-%m-%d %H:%i:%s'), TIME_FORMAT(`blocking_trx_age`, '%H:%i:%s'), `blocking_trx_rows_locked`, `blocking_trx_rows_modified`, `sql_kill_blocking_query`, `sql_kill_blocking_connection` FROM sys.x$innodb_lock_waits LIMIT 500" ) if err != nil { wlog.Error(fmt.Sprintf("failed to fetch data from source table: %s" , err)) return nil, fmt.Errorf("failed to fetch data from source table: %s" , err) } defer rows.Close() var data []LockWaits for rows.Next () { var rowData LockWaits err := rows.Scan(&rowData.wait_started, &rowData.wait_age, &rowData.wait_age_secs, &rowData.locked_table, &rowData.locked_index, &rowData.locked_type, &rowData.waiting_trx_id, &rowData.waiting_trx_started, &rowData.waiting_trx_age, &rowData.waiting_trx_rows_locked, &rowData.waiting_trx_rows_modified, &rowData.waiting_pid, &rowData.waiting_query, &rowData.waiting_lock_id, &rowData.waiting_lock_mode, &rowData.blocking_trx_id, &rowData.blocking_pid, &rowData.blocking_query, &rowData.blocking_lock_id, &rowData.blocking_lock_mode, &rowData.blocking_trx_started, &rowData.blocking_trx_age, &rowData.blocking_trx_rows_locked, &rowData.blocking_trx_rows_modified, &rowData.sql_kill_blocking_query, &rowData.sql_kill_blocking_connection) if err != nil { wlog.Error(fmt.Sprintf("failed to scan row data: %s" , err)) return nil, fmt.Errorf("failed to scan row data: %s" , err) } defer mydb.Close() data = append(data, rowData) } mydb.Close() return data, nil } func InsertLockWaitsData(data []LockWaits) error { var dbClinent = mysql.InitConnect() tableName := "myawr_db_innodb_lock_waits" stmt, err := dbClinent.Prepare(fmt.Sprintf("INSERT INTO %s " + "(snap_id, wait_started, wait_age, wait_age_secs, locked_table, locked_index, locked_type, waiting_trx_id, waiting_trx_started, waiting_trx_age, waiting_trx_rows_locked, waiting_trx_rows_modified, waiting_pid, waiting_query, waiting_lock_id, waiting_lock_mode, blocking_trx_id, blocking_pid, blocking_query, blocking_lock_id, blocking_lock_mode, blocking_trx_started, blocking_trx_age, blocking_trx_rows_locked, blocking_trx_rows_modified, sql_kill_blocking_query, sql_kill_blocking_connection, m) " + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);" , tableName)) if err != nil { wlog.Error(fmt.Sprintf("failed to prepare INSERT statement: %s" , err)) return fmt.Errorf("failed to prepare INSERT statement: %s" , err) } defer stmt.Close() snap_id, m, err := GetSnapshot() if err != nil { wlog.Error(fmt.Sprintf("failed to get snapshot: %s" , err)) return fmt.Errorf("failed to get snapshot: %s" , err) } for _, rowData := range data { formatted := formatSQL(rowData.blocking_query) _, err := stmt.Exec(snap_id, rowData.wait_started, rowData.wait_age, rowData.wait_age_secs, rowData.locked_table, rowData.locked_index, rowData.locked_type, rowData.waiting_trx_id, rowData.waiting_trx_started, rowData.waiting_trx_age, rowData.waiting_trx_rows_locked, rowData.waiting_trx_rows_modified, rowData.waiting_pid, rowData.waiting_query, rowData.waiting_lock_id, rowData.waiting_lock_mode, rowData.blocking_trx_id, rowData.blocking_pid, formatted, rowData.blocking_lock_id, rowData.blocking_lock_mode, rowData.blocking_trx_started, rowData.blocking_trx_age, rowData.blocking_trx_rows_locked, rowData.blocking_trx_rows_modified, rowData.sql_kill_blocking_query, rowData.sql_kill_blocking_connection, m) if err != nil { wlog.Error(fmt.Sprintf("failed to insert data: %s" , err)) return fmt.Errorf("failed to insert data: %s" , err) } } dbClinent.Close() return nil } func main () { // mini myawr ticker := time.Tick(1 * time.Second) timer := time.After(300 * time.Second) for { select { case fmt.Println("任务执行" ) lockwaitsdata, _ := mysql.FetchLockWaitsData(conf.Option["mysql_host" ], conf.Option["mysql_port" ], conf.Option["mysql_user" ], conf.Option["mysql_password" ]) _ = mysql.InsertLockWaitsData(lockwaitsdata) case fmt.Println("程序结束" ) return } } }
近期好文:
高效部署:基于 Prometheus 的云原生监控,用完真香?
“高效运维”公众号诚邀广大技术人员投稿
投稿邮箱:[email protected] ,或添加联系人微信:greatops1118。