点击上方“程序员大咖”,选择“置顶公众号”
关键时刻,第一时间送达!
![](http://mmbiz.qpic.cn/mmbiz_jpg/ow6przZuPIG18kzbzliaybJA5y49s2zn9OVOziayVbGUmujwPYZLVGmbMalFKZr8GstdMVntUWN6NVaibmrbsQR4w/640?)
![](http://mmbiz.qpic.cn/mmbiz_gif/XUfq62QbuNiaiaU6R5GZib4Hn57JQ4pTHGlWvH7zojXP2pDexPyuJraKdFlLSzhVrufnyibfrvIJQ7n1Z9uMsZR7Ew/640?wx_fmt=gif)
作为数据工程师或者数据分析师,经常会跟各种数据打交道,其中,获取数据这一关是无法避免的,下面,我就将自己时常工作中用到的数据连接配置模型分享出来,供大家交流。
MySQL数据库
mysql数据库是目前用的最多的数据库之一,此处我做的是读和写的接口,而删除和更新操作,一般不是分析师做的,而是开发,所以我没有做这个。
import MySQLdb
import pandas as pd
from sqlalchemy import create_engine
class con_analyze:
"""数据分析平台连接"""
def __init__(self, database='myanalyze'):
self.database = database
self.conn = None
def connect(self):
self.conn = MySQLdb.connect(host='***', user='root', passwd='***', db=self.database, charset='utf8')
def query(self, sql):
try:
self.connect()
data = pd.read_sql(sql, self.conn)
except (AttributeError,
MySQLdb.OperationalError):
self.connect()
data = pd.read_sql(sql, self.conn) # 读取数据出现错误,再次连接
return data
def store(self, mydataframe, table_name, if_exists='replace'):
conn2 = "mysql+mysqldb://root:***@***:3306/%s" % self.database
local_engine = create_engine(conn2)
mydataframe.to_sql(table_name, local_engine, if_exists=if_exists, index=False, chunksize=10000)
'''还可以加一个函数用来执行单条sql语句,不仅仅是读取数据,还可以update,create等'''
作为一个链接类来使用,初始化的时候给出的conn是None,只有在执行查询函数的时候才创建链接,(链接中,我隐去了自己的host信息,你需要将自己的host填进去)
查询的时候使用了try语句,如果链接不成功或者查询不成功,就会出错,如果是链接不成功,那就在异常中再次连接。关于重复执行一段代码,有一个库大家可以关注一下:tenacity 这个库能让你实现更优雅(pythonic)的代码重复
此处读取数据是使用pandas库中的read_sql函数,此函数可以直接将查询结果转化成一个dataframe,方便了后面的分析工作
存储功能也是使用dataframe的函数tosql,此函数是将一个df直接转化成sql数据存入数据库,如果tablename存在,可以选择替换(replace)、增加(append)等,如果df很大很长,就需要设置一下chunksize参数
chunksize的设定,程序会自动将你的长达几十万行的df迭代存储,每次只存储10000行(这个数字是我设定的,你也可以改)。
看到这里,你可能会有疑问,为什么读和写的conn不一样,一个是用 MySQLdb.connect创建,而另一个是用create_engine创建。我想说的是,后面这个conn2其实可以作为读的连接参数,但是使用 MySQLdb.connect创建的连接却不一定能用来写,因为我在实践中多次运行发生了错误,所以我就改了。
其实,其他的数据库可以类似这种做法,给自己的项目配置一个连接类,使用的时候应该是这样的:
首先,你需要把代码放在一个单独的配置文件,比如config.py中
然后在你需要使用的地方,导入此配置文件
from config import con_analyze
class AnalyzeData:
def __init__(self):
# 此处初始化,可以带一个参数:database,默认为myanalyze
self.conn = con_analyze()
# self.conn2 = con_analyze("myanalyze_2")
def get_data(self, sql):
# 执行sql查询结果保存到df中
df = self
.conn.query(sql=sql)
def store_data(self, df):
# 将dataframe类型的数据df,存入名为dd_name的数据表中
self.conn.store(df, 'db_name')
MongoDB
mongodb是一个非结构化数据库,里面存储的数据类似于json,是键值对的形式,如果你遇到了需要查询mongodb中的数据,下面我就简单介绍一下。
![](http://mmbiz.qpic.cn/mmbiz_png/HZW0wwFxbQDibHmEQbWaOexopW9y9CZRoKdxAqeicn2Abq0L1UDMdyibuH7zpmhQvOiacCgDSkl4fiau1Nqe6zkUgDg/640?wx_fmt=png)
同样,也是要建立一个类,这是为了规范。
import pymongo
import pandas as pd
class Conn_Mongo:
"""mongo 数据库连接"""
def __init__(self):
self.mongo_utoken = pymongo.MongoClient('mongodb://***:27000').utoken # 用户表
def get_user_data_mongo(self,list_id):
"""
通过连接 mongo查找
"""
user_data = pd.DataFrame(list(self.mongo_fotor.userinfo.find({'FToken': {'$in': list(list_id)}})))
return user_data
这个毕竟简单,就是一个查询操作,我是先传入一串id,根据id找到对应的信息。一般来说,mongodb的库容量都比较大,所以我是有针对的查询相关信息。
这里用到了pymongo库,通过它创建一个到相应地址(我用*隐掉了)的连接,后面的.utoken是对应的库名称,其实你也可以把它作为参数,在初始化的时候传进去。
后面查询的时候使用了find函数,其前面的userinfo是表的名称,find的参数也是键值对的形式,这里我指定了键的名称"FToken",其值{'$in': list(list_id)}代表的意思是:在什么什么中。
将id 做成了一个list(为了大家理解,取名为list_id),相关语法大家可以查阅一下。
Flurry
如果你的工作涉及到了app的数据,那经常会使用Flurry获取数据。
Flurry是一个移动统计平台,虽然是国外的,但国内依然可以用(不像谷歌分析被禁了),ios和Android应用的运营数据都可以在上面统计查询。
如果你还没有,又想了解的,可以戳这里:Flurry
![](http://mmbiz.qpic.cn/mmbiz_png/HZW0wwFxbQDibHmEQbWaOexopW9y9CZRobNDuGdJiaicNj4mcDiaCTiauZkcfYehORHbq5ciaZDgicLNUEPU6PfjpCIEA/640?wx_fmt=png)
对,网页浏览的话,界面就是这样的。
常用的功能是用户数据
![](http://mmbiz.qpic.cn/mmbiz_png/HZW0wwFxbQDibHmEQbWaOexopW9y9CZRoKcbynxgVNL9B7q5GV4nvuvr3OT2IVzftAb6faiaic13fRgnicuoMsF9DQ/640?wx_fmt=png)
以及功能点击事件
![](http://mmbiz.qpic.cn/mmbiz_png/HZW0wwFxbQDibHmEQbWaOexopW9y9CZRo9b9WkarB6x6hVNMTUTOoFDZct5ATFXG2uhPsLb3g8wDUw5kzqKX0Sg/640?wx_fmt=png)
不过,这不是我要说的重点,上面只是让你看一下Flurry长什么样,现在我要写python接口,将这些数据取出。
Flurry的api地址,请戳这里:Flurry API
这是创建分析报告的api,有别于开发的api
首先,我们需要去申请一个app token,用于获取连接权限,申请方法请参考:app access token
它是大一串字母
![](http://mmbiz.qpic.cn/mmbiz_png/HZW0wwFxbQDibHmEQbWaOexopW9y9CZRoRflibvNbiawKUeYuy0Hazgdz0qwicCt4PEWYgrpbAD4EL2nOSWy2d2OXg/640?wx_fmt=png)
只要获取到了这个token,我们就可以创建一个url,用于获取Flurry里面的数据了,具体看如下的代码:
import pandas as pd
import json, requests
class Conn_Flurry:
"""flurry api data"""
api_token = "******.****.****"
headers = {'Authorization': 'Bearer {}'.format(api_token)}
url = "https://api-metrics.flurry.com/public/v1/data/appEvent/day/app?metrics=activeDevices,newDevices,averageTimePerDevice&dateTime=2017-05-23/2017-05-24"
def get_results(self, url=url):
'''
这里使用的url是一个示例,也可以使用get_url函数创建需要的url传入此函数作为参数
'''
data = requests.get(url, headers=self.headers)
cleaned = json.loads(data.text, 'utf-8')
cleaned = pd.DataFrame(cleaned['rows'])
return cleaned
def get_url(self, table='appEvent', timegrain=