这是Log Service SDK for Python的开源版本。Log Service SDK for Python是阿里云日志服务 (Log Service)API的Python编程接口,提供了对于Log Service Rest API所有接口的封装 和支持,帮助Python开发人员更快编程使用阿里云Log Service服务。
参考文档: http://aliyun-log-python-sdk.readthedocs.io/
不想写代码? 可以试一下命令行控制台, 其覆盖了这个SDK的几乎所有功能.
- 封装所有Rest API (管理, 数据操作, 消费组等)
- 消费组高阶类支持
- Python日志模块的Handler
- 高阶操作支持: 自动分页, 自动未完成重试, 复制Project/Logstore配置, 调整更大的Shard读写数, 查看资源使用等.
- Elasticsearch 数据迁移
- 数据ETL功能: 按照shard/时间高速跨logstore复制数据, 根据灵活的配置规则, 对数据进行批量或持续可并发的数据ETL转换.
- Python 2.6
- Python 2.7
- Python 3.3
- Python 3.4
- Python 3.5
- Python 3.6
- Python 3.7
- Pypy2
- Pypy3
- Log Service API 0.6
pip install -U aliyun-log-python-sdk
如果提示time-out
之类的错误,表示网络不通,建议可以加上国内清华的索引试一试:
pip install -U aliyun-log-python-sdk -i https://pypi.tuna.tsinghua.edu.cn/simple
如果存在安装Regex失败的错误, 可以参考使用yun
/apt-get
或者手动安装一下python-devel
https://rpmfind.net/linux/rpm2html/search.php?query=python-devel
参考SDK配置 获得访问秘钥的ID和Key以及访问入口Endpoint, 构建一个LogClient的客户端.
from aliyun.log import LogClient
# “华东 1 (杭州)” Region 的日志服务入口。
endpoint = 'cn-hangzhou.log.aliyuncs.com'
# 用户访问秘钥对中的 AccessKeyId。
accessKeyId = 'ABCDEFGHIJKLJMN'
# 用户访问秘钥对中的 AccessKeySecret。
accessKey = 'OPQRSTUVWXYZ'
client = LogClient(endpoint, accessKeyId, accessKey)
# 使用client的方法来操作日志服务
如果要使用https连接, 在配置endpoint
时, 传入https://
前缀. 例如:
endpoint = 'https://cn-hangzhou.log.aliyuncs.com'
*.aliyuncs.com
的证书是GlobalSign的, 默认大部分机器已经按照并信任, 如果您的机器没有默认信任, 可以下载证书并安装信任, 可以参考这个文档来操作.
-
获取列表
列出本region下面的所有可见项目:
res = client.list_project()
res.log_print()
注意: 默认获取100个项目,通过传入参数offset
和size
来获取更多
-
获取信息
获取单个项目的较为详细的信息.
res = client.get_project('project1')
res.log_print()
- 创建
res = client.create_project("project1", "a simple project")
res.log_print()
- 删除
res = client.delete_project("project1")
res.log_print()
注意: 只能删除空的项目.
-
复制
复制一个项目的所有日志库和相应的配置(包括机器组合索引等), 要求目标项目不存在.
res = client.copy_project("project1", "project2")
res.log_print()
日志库属于某一个项目, 所有的操作都需要传入项目名称.
-
获取列表
获取一个项目下的所有日志库:
from aliyun.log import ListLogstoresRequest
request = ListLogstoresRequest('project1')
res = client.list_logstores(request)
res.log_print()
-
创建
创建一个日志库:
res = client.create_logstore('project1', 'logstore1', ttl=30, shard_count=3)
res.log_print()
注意: 参数ttl
和shard_count
表示日志存储日期和分区数量.
-
获取信息
获取单个日志库较为详细的信息.
res = client.get_logstore('project1', 'logstore1')
res.log_print()
-
删除
通过
delete_logstore
删除日志库 -
更新
通过
update_logstore
删除日志库
分区属于某一个日志库, 所有的操作都需要传入项目名称和日志库名称.
-
获取列表
通过
list_shards
获取列表 -
分裂
通过
split_shard
分裂分区 -
合并
通过
merge_shard
合并分区
Logtail的配置拥有独立的名字, 但其与日志库(logstore)一般是一一对应的关系.
-
获取列表
列出本项目下所有Logtail的配置名单:
res = client.list_logtail_config('project1')
res.log_print()
注意: 默认获取100个配置项,通过传入参数offset
和size
来获取更多
输出:
{"count": 2, "configs": ["config_name1", "config_name2"], "total": 2}
- 创建
创建一个Logtail配置, 并关联到日志库上:
from aliyun.log import LogtailConfigGenerator as helper
import json
config_detail_json = """{
"configName": "json_1",
"inputDetail": {
"logType": "json_log",
"filePattern": "test.log",
"logPath": "/json_1"
},
"inputType": "file",
"outputDetail": {
"logstoreName": "logstore3"
}
}"""
request = helper.generate_config(json.loads(config_detail))
res = client.create_logtail_config('project1', request)
res.log_print()
注意:
-
创建的配置的名字
configName
和关联的日志库名字logstoreName
都是放在传入的request
中. -
不同类型的Logtail配置参数不一样,可以参考这篇文章了解更多业务逻辑。
-
更多的JSON样例,请参考这里。
-
创建的Logtail的配置还没有应用到任何一个机器组, 需要调用后面的API
apply_config_to_machine_group
来进行配置. -
获取信息
获取Logtail配置的具体信息:
res = client.get_logtail_config('project1', 'config1')
res.log_print()
-
修改
通过
update_logtail_config
来修改Logtail配置. -
删除
通过
delete_logtail_config
来删除Logtail配置.
机器组(MachineGroup)主要是用于应用Logtail配置的. 其与Logtail配置的关系是多对多的关系. 一个Logtail配置可以应用到多个机器组上, 放置一个机器组也可以应用多个Logtail配置.
-
获取列表
列出本项目下所有机器组的名单:
res = client.list_machine_group('project1')
res.log_print()
注意: 默认获取100个机器组,通过传入参数offset
和size
来获取更多
输出:
{"count": 2, "machinegroups": ["group_name1", "group_name2"], "total": 2}
-
创建
创建一个机器组:
from aliyun.log import MachineGroupDetail
config_detail_json = {
"group_name": "group_name1",
"machine_list": [
"machine1",
"machine2"
],
"machine_type": "userdefined",
"group_type": "Armory",
"group_attribute": {
"externalName": "ex name",
"groupTopic": "topic x"
}
}
request = MachineGroupDetail()
request.from_json(config_detail_json)
res = client.create_machine_group('project1', request)
res.log_print()
注意:
-
创建的机器组的名字
group_name
是放在传入的request
中. -
创建的机器组还没有应用到任何一个Logtail配置, 需要调用后面的API
apply_config_to_machine_group
来进行配置. -
获取信息
获取机器组的具体信息:
res = client.get_machine_group('project1', 'group1')
res.log_print()
-
修改
通过
update_logtail_config
来修改Logtail配置. -
删除
通过
delete_logtail_config
来删除Logtail配置.
机器组与Logtail配置的关系是多对多的关系. 一个Logtail配置可以应用到多个机器组上, 反之一个机器组也可以应用多个Logtail配置.
- 应用Logtail配置到特定机器组
res = client.apply_config_to_machine_group('project1', 'config1', 'group1')
res.log_print()
- 去除机器组的Logtail配置
res = client.remove_config_to_machine_group('project1', 'config1', 'group1')
res.log_print()
- 获取Logtail配置应用到的机器组名单
res = client.get_config_applied_machine_groups('project1', 'config1')
res.log_print()
输出:
{"count": 2, "machinegroups": ["group1", "group2"]}
- 获取机器组应用的Logtail配置名单
res = client.get_machine_group_applied_configs('project1', 'group1')
res.log_print()
输出:
{"count": 2, "configs": ["config1", "config2"]}
只有配置了索引的日志库才能使用SQL查询日志.
- 创建
给一个日志库创建索引
from aliyun.log import IndexConfig
request_json = {
"keys": {
"f1": {
"caseSensitive": False,
"token": [
",", " ", "\"", "\"", ";", "=", "(", ")", "[", "]",
"{", "}", "?", "@", "&", "<", ">", "/", ":", "\n", "\t"
],
"type": "text",
"doc_value": True
},
"f2": {
"doc_value": True,
"type": "long"
}
},
"storage": "pg",
"ttl": 2,
"index_mode": "v2",
"line": {
"caseSensitive": False,
"token": [
",", " ", "\"", "\"", ";", "=", "(", ")", "[", "]", "{",
"}", "?", "@", "&", "<", ">", "/", ":", "\n", "\t"
]
}
}
request = IndexConfig()
request.from_json(request_json)
res = client.create_index('project1', 'logstore1', request)
res.log_print()
更多索引样例,可以参考这里,注意,使用SDK时,需要将true/false
改成Python对应的True/False
。
-
修改
通过
update_index
修改日志库的索引 -
获取
通过
get_index_config
获得日志库的索引配置 -
删除
通过
delete_index
删除日志库的索引
- 获取日志库主题列表
from aliyun.log import ListTopicsRequest
request = ListTopicsRequest('project1', 'logstore1')
res = client.list_topic(request)
res.log_print()
有三种方式消费日志:
- 拉取数据(PullLog): 根据分区游标来消费日志: 需要指定分区(Shard)以及游标.
- 日志库查询数据(GetLog): 通过索引查询来消费特定日志库日志: 需要指定日志库与日志时间以及(或)查询条件.
- 项目统计查询数据(GetProjectLog): 通过索引查询来消费整个项目组日志: 需要指定日志时间以及(或)统计查询查询条件.
- 实时消费(Consumer Group): 第一种的高级方式, 通过服务器支持的消费组, 来并发可靠的快速拉取日志.
拉取数据需要传入游标和分区, 获取分区可以参考前面的管理日志库分区(shard)
, 这里介绍游标操作.
-
获取开头游标
获取日志库特定分区的最开头的游标.
res = client.get_begin_cursor('project1', 'logstore1', shard_id=0)
print(res.get_cursor())
-
获取结尾游标
获取日志库特定分区的结尾的游标.
res = client.get_end_cursor('project1', 'logstore1', shard_id=0)
print(res.get_cursor())
-
获取特定时间的游标
可以特定日志库分区的特定接受时间最接近的一个游标.
res = client.get_cursor('project1', 'logstore1', shard_id=0, start_time="2018-1-1 10:10:10")
print(res.get_cursor())
-
这里的
start_time
指的是服务器接受日志的时间. 也可以是begin
或者end
-
获取游标时间
获得特定日志库分区的某个游标说对应的服务器时间, 如果是结尾游标, 一般对应于服务器的的当前时间.
res = client.get_begin_cursor('project1', 'logstore1', shard_id=0)
res = client.get_cursor_time('project1', 'logstore1', shard_id=0, cursor=res.get_cursor())
print(res.get_cursor_time())
-
获取游标时间
获得特定日志库分区的某个游标的上一个游标所对应的服务器时间, 如果是开头游标, 则对应于服务器的的开头游标的时间.
res = client.get_end_cursor('project1', 'logstore1', shard_id=0)
res = client.get_previous_cursor_time('project1', 'logstore1', shard_id=0, cursor=res.get_cursor())
print(res.get_cursor_time())
根据游标获取数据, 需要传入分区.
下面例子消费分区0
一个小时前收集到的数据.
from time import time
res = client.get_cursor('project1', 'logstore1', shard_id=0, start_time=int(time())-3600)
res = client.pull_logs('project1', 'logstore1', shard_id=0, cursor=res.get_cursor())
res.log_print()
# 或者
it = client.pull_log('project1', 'logstore1', shard_id=0, from_time="2018-1-1 10:10:10", to_time="2018-1-1 10:20:10")
for res in it:
res.log_print()
# 或者大并发直接下载在本地
it = client.pull_log('project1', 'logstore1', from_time="2018-1-1 10:10:10", to_time="2018-1-1 10:20:10", file_path="/data/dump_{}.data")
for res in it:
res.log_print()
注意: 默认获取1000条, 可以通过参数count
来调节. 也可以通过参数end_cursor
来设定设定一个结束的游标.
消费特定日志库, 根据索引获取数据, 需要传入时间范围, 也可以传入查询语句. 下面的例子查询时间是过去一小时特定日志库的前100条日志.
from time import time
from aliyun.log import GetLogsRequest
request = GetLogsRequest("project1", "logstore1", fromTime=int(time()-3600), toTime=int(time()), topic='', query="*", line=100, offset=0, reverse=False)
# 或者
request = GetLogsRequest("project1", "logstore1", fromTime="2018-1-1 10:10:10", toTime="2018-1-1 10:20:10", topic='', query="*", line=100, offset=0, reverse=False)
res = client.get_logs(request)
res.log_print()
也可以通过接口get_log
来获取
from time import time
res = client.get_log("project1", "logstore1", from_time=int(time()-3600), to_time=int(time()), size=-1)
# 或者
res = client.get_log("project1", "logstore1", from_time="2018-1-1 10:10:10", to_time="2018-1-1 10:20:10", size=-1)
res.log_print()
这里传入size=-1
可以获取所有.
通过get_histograms
来根据索引获取数据特定日志时间范围内的分布图.
跨日志库的对整个项目组查询, 根据索引获取数据, 需要传入时间范围和查询语句. 注意: 因为跨项目组传输数据可能非常大从而影响实际的性能, 推荐使用统计查询SQL来降低传输数据量.
下面的例子查询时间是一段时间内的项目组的几个日志库的日志数量.
from aliyun.log import GetProjectLogsRequest
req = GetProjectLogsRequest("project1","select count(1) from logstore1, logstore2, logstore3 where __date__ >'2017-11-10 00:00:00' and __date__ < '2017-11-13 00:00:00'")
res = client.get_project_logs(req)
res.log_print();
注意:注意SQL中使用的字段需要在索引中配置好,例如上面例子中的__date__
通过消费组(Consumer Group)可以获得可保障的自动扩展的日志消费服务.
可以参考这这几篇实战文章与样例。
高级接口已经对基础接口进行了封装. 个别情况下也可以通过基础接口进行一些特定的操作.
-
获取列表
通过
list_consumer_group
h获得当前消费组列表. -
创建
通过
create_consumer_group
创建一个消费组. -
更新
通过
update_consumer_group
更新一个消费组, 例如延迟和消费顺序等. -
删除
通过
delete_consumer_group
删除一个消费组. -
获取消费进度
可以通过
get_check_point
获得消费组的消费检查点(Checkpoint), 来了解消费进度信息 -
更新消费进度
消费者需要通过
update_check_point
来存储和更新消费检查点(Checkpoint)
管理报表
-
获取列表
通过
list_dashboard
获取报表的列表 -
创建报表
通过
create_dashboard
创建一个报表. 传入的结构是一个字典对象,如下:
{
"charts": [
{
"display": {
"displayName": "",
"height": 5,
"width": 5,
"xAxis": [
"province"
],
"xPos": 0,
"yAxis": [
"pv"
],
"yPos": 0
},
"search": {
"end": "now",
"logstore": "access-log",
"query": "method: GET | select ip_to_province(remote_addr) as province , count(1) as pv group by province order by pv desc ",
"start": "-86400s",
"topic": ""
},
"title": "map",
"type": "map"
},
{
"display": {
"displayName": "",
"height": 5,
"width": 5,
"xAxis": [
"province"
],
"xPos": 5,
"yAxis": [
"pv"
],
"yPos": 0
},
"search": {
"end": "now",
"logstore": "access-log",
"query": "method: POST | select ip_to_province(remote_addr) as province , count(1) as pv group by province order by pv desc ",
"start": "-86400s",
"topic": ""
},
"title": "post_map",
"type": "map"
}
],
"dashboardName": 'dashboard_1',
"description": ""
}
-
获取报表
通过
get_dashboard
获取一个报表的具体信息. -
更新报表
通过
update_dashboard
更新一个报表,传入的结构与创建一样。 -
删除报表
通过
delete_dashboard
删除一个报表.
管理报警
-
获取报警
通过
list_alert
获取报警的列表 -
创建报警
通过
create_alert
创建一个报警. 传入的结构是一个字典对象,如下:
{
"name": 'alert_1',
"displayName": "Alert for testing",
"description": "",
"type": "Alert",
"state": "Enabled",
"schedule": {
"type": "FixedRate",
"interval": "5m",
},
"configuration": {
"condition": "total >= 100",
"dashboard": "dashboard-test",
"queryList": [
{
"logStore": "test-logstore",
"start": "-120s",
"end": "now",
"timeSpanType": "Custom",
"chartTitle": "chart-test",
"query": "* | select count(1) as total",
}
],
"notificationList": [
{
"type": "DingTalk",
"serviceUri": "http://xxxx",
"content": "Message",
},
{
"type": "MessageCenter",
"content": "Message",
},
{
"type": "Email",
"emailList": ["[email protected]"],
"content": "Email Message",
},
{
"type": "SMS",
"mobileList": ["132373830xx"],
"content": "Cellphone message"
}
],
"muteUntil": 1544582517,
"notifyThreshold": 1,
"throttling": "5m",
}
}
-
获取报警
通过
get_alert
获取一个报警的具体信息. -
更新报警
通过
update_alert
更新一个报警,传入的结构与创建一样。 -
删除报警
通过
delete_alert
删除一个报警.
管理快速查询
-
获取快速查询
通过
list_savedsearch
获取快速查询的列表 -
创建快速查询
通过
create_savedsearch
创建一个快速查询. 传入的结构是一个字典对象,如下:
{
"logstore": "test2",
"savedsearchName": 'search_1',
"searchQuery": "boy | select sex, count() as Count group by sex",
"topic": ""
}
-
获取快速查询
通过
get_savedsearch
获取一个快速查询的具体信息. -
更新快速查询
通过
update_savedsearch
更新一个快速查询,传入的结构与创建一样。 -
删除快速查询
通过
delete_savedsearch
删除一个快速查询.
管理外部存储, 参考文章
-
获取外部存储列表
通过
list_external_store
获取快速查询的列表 -
创建外部存储
通过
create_external_store
创建一个快速查询. 传入的结构是一个字典对象,如下:
{
"externalStoreName": "rds_store4",
"storeType": "rds-vpc",
"parameter": {
"vpc-id": "vpc-m5eq4irc1pucpk85frr5j",
"instance-id": "i-m5eeo2whsnfg4kzq54ah",
"host": "1.2.3.4",
"port": "3306",
"username": "root",
"password": "123",
"db": "meta",
"table": "join_meta",
"region": "cn-qingdao"
}
}
-
获取外部存储
通过
get_external_store
获取一个快速查询的具体信息. -
更新外部存储
通过
update_external_store
更新一个快速查询,传入的结构与创建一样。 -
删除外部存储
通过
delete_external_store
删除一个快速查询.
投递的配置一般称为Job, 包含了投递的具体配置以及调度日程安排. 而某一个具体时间的运行实例称为Task.
-
获取配置列表
通过
list_shipper
获取投递配置的列表 -
创建配置
通过
create_shipper
创建一个投递配置. 可以传入一个json字符串(如下)或者对应的dict对象(如get_shipper获得的对象结果),如下:
OSS以CSV格式投递:
{
"shipperName": "oss-ship-test1",
"targetConfiguration": {
"bufferInterval": 300,
"bufferSize": 256,
"compressType": "none",
"enable": true,
"ossBucket": "bucket1",
"ossPrefix": "bucket_folder1",
"pathFormat": "%Y/%m/%d/%H/%M",
"roleArn": "acs:ram::1234:role/aliyunlogdefaultrole",
"storage": {
"detail": {
"columns": [
"k3",
"k4"
],
"delimiter": "|",
"header": false,
"nullIdentifier": "",
"quote": "\""
},
"format": "csv"
}
},
"targetType": "oss"
}
OSS以JSON格式投递(注意storage.detail.columns为空是必须的,这个和get_shipper有所差别):
{
"shipperName": "oss-ship-test1",
"targetConfiguration": {
"bufferInterval": 300,
"bufferSize": 256,
"compressType": "snappy",
"enable": true,
"ossBucket": "bucket1",
"ossPrefix": "bucket_folder1",
"pathFormat": "%Y/%m/%d/%H/%M",
"roleArn": "acs:ram::1234:role/aliyunlogdefaultrole",
"storage": {
"detail": {
"enableTag": true,
"columns": [
]
},
"format": "json"
}
},
"targetType": "oss"
}
-
获取配置
通过
get_shipper
获取一个投递配置的具体信息. -
更新配置
通过
update_shipper
更新一个投递配置. -
删除配置
通过
delete_shipper
删除一个投递配置. -
获取运行实例列表
通过
get_shipper_tasks
获取投递运行实例. -
重试运行实例
通过
retry_shipper_tasks
重试某一个运行实例.
用于将 Elasticsearch 中的数据迁移至日志服务。
- 将 hosts 为
localhost:9200
的 Elasticsearch 中的所有文档导入日志服务的项目project1
中。
migration_manager = MigrationManager(hosts="localhost:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key)
migration_manager.migrate()
- 指定将 Elasticsearch 中索引名以
myindex_
开头的数据写入日志库logstore1
,将索引index1,index2
中的数据写入日志库logstore2
中。
migration_manager = MigrationManager(hosts="localhost:9200,other_host:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key,
logstore_index_mappings='{"logstore1": "myindex_*", "logstore2": "index1,index2"}}')
migration_manager.migrate()
- 使用参数 query 指定从 Elasticsearch 中抓取
title
字段等于python
的文档,并使用文档中的字段date1
作为日志的 time 字段。
migration_manager = MigrationManager(hosts="localhost:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key,
query='{"query": {"match": {"title": "python"}}}',
time_reference="date1")
migration_manager.migrate()
- 日志服务与SIEM(如Splunk)集成方案
- 使用消费组实时分发数据
- 使用消费组实时实时跨域监测多日志库数据
- 使用Log Handler自动上传Python日志
- Log Handler自动解析KV格式的日志
- Log Handler自动解析JSON格式的日志
- Elasticsearch 数据迁移
- 日志服务产品介绍:http://www.aliyun.com/product/sls/
- 日志服务产品文档:https://help.aliyun.com/product/28958.html
- 其他问题请提工单