使用Flask+AdminLTE 开发自己的管理平台

题记

前段时间没什么事情, 用Flask写了一个web manager,目前还在不断完善

主要完成的模块包含有:

  • 网易邮箱的邮件管理
  • 域名解析管理
  • 集成grafanadashboard
  • 使用Ansible 2.0 API完成了命令批量执行
  • Ansible-CMDB构建CMDB
示例

实现

服务监控
这一部分主要是依赖于之前搭建的基于open-falcon的监控系统,由于数据展示用的是grafana,所以使用的grafanadashboardsnapshot功能直接分享到这个系统上。

邮件接受
主要的code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# coding: utf-8
import poplib
from email.parser import Parser
from email.header import decode_header
from email.utils import parseaddr, parsedate_tz
from datetime import datetime
from app.utils.personal_config import pop3_server, email, password
from app import db
from app.modles import Email, SyncLog
email_info_lst = ['From', 'To', 'Subject', 'Date']
def decode_str(s):
value, charset = decode_header(s)[0]
if charset:
value = value.decode(charset)
return value
def guess_charset(msg):
charset = msg.get_charset()
if charset is None:
content_type = msg.get('Content-Type', '').lower()
pos = content_type.find('charset=')
if pos >= 0:
charset = content_type[pos + 8:].strip()
return charset
def get_header_value(msg, header):
value = msg.get(header, '')
if value:
if header == 'Subject':
value = decode_str(value)
elif header == 'Date':
print(msg.get('Date'))
try:
date_time = datetime.strptime(msg.get('Date'), "%a, %d %b %Y %H:%M:%S %z (%Z)")
value = date_time.strftime('%Y-%m-%d %H:%M')
except ValueError as e:
date_time = parsedate_tz(msg.get('Date'))
value = "%s-%s-%s %s:%s" % date_time[0:5]
else:
hdr, addr = parseaddr(value)
name = decode_str(hdr)
value = u'%s %s' % (name, addr)
return value
def save_info(msg):
email = Email()
for header in email_info_lst:
value = get_header_value(msg, header)
if header is email_info_lst[0]:
email.mail_sender = value
elif header is email_info_lst[1]:
email.mail_receiver = value
elif header is email_info_lst[2]:
email.subject = value
else:
email.time = value
db.session.add(email)
def email_sync():
try:
server = poplib.POP3(pop3_server)
server.user(email)
server.pass_(password)
resp, mails, octets = server.list()
index = len(mails)
sync = SyncLog.query.order_by(SyncLog.ptr.desc()).first()
if sync:
start = sync.ptr
else:
start = 1
if start < index+1:
for i in range(start, index + 1):
resp, lines, octets = server.retr(i)
msg_content = b'\r\n'.join(lines).decode('utf-8', 'ignore')
msg = Parser().parsestr(msg_content)
save_info(msg)
new_sync = SyncLog(
ptr=index+1,
has_view=False
)
db.session.add(new_sync)
db.session.commit()
info = '更新完成'
else:
info = '没有需要更新的邮件'
except Exception as e:
info = '更新失败: %s' % e
return info

域名解析管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# coding: utf-8
import requests
from app.utils.personal_config import dns_login_token
from app.modles import RecordInfo
from app import db
from datetime import datetime
records_url = 'https://dnsapi.cn/Record.{}'
def records_sync():
data = {
'login_token': dns_login_token,
'format': 'json',
'domain_id': '28921413'
}
res = requests.post(records_url.format('List'), data=data)
if res.status_code == 200:
res_data = res.json()
status_code = res_data.get('status').get('code')
if status_code == '1':
# status code = 1 means requests get 'Action completed successful'
records_data = res_data.get('records')
record_new_sp_id_lst = set()
record_old_sp_id_lst = db.session.query(RecordInfo.sp_id).group_by(RecordInfo.sp_id).all()
record_old_sp_id_lst = set([v[0] for v in record_old_sp_id_lst])
for record in records_data:
record_new_sp_id_lst.add(record['id'])
if not RecordInfo.query.filter_by(sp_id=record['id']).first():
if record['enabled'] == '1':
records = RecordInfo()
records.sp_id = record['id']
records.domain_name = res_data['domain']['name']
records.name = record['name']
records.type = record['type']
records.value = record['value']
records.updated_time = record['updated_on']
records.ttl = record['ttl']
records.use_status = True
records.monitor_status = 'unknown'
db.session.add(records)
for record_id in record_old_sp_id_lst - record_new_sp_id_lst:
record = RecordInfo.query.filter_by(sp_id=record_id).first()
if record:
db.session.delete(record)
db.session.commit()
return 'sync succeed'
else:
return 'sync failed'
def records_add(name, value, record_type, domain_name='zhxfei.com'):
data = {
'login_token': dns_login_token,
'format': 'json',
'sub_domain': name,
'record_type': record_type,
'record_line': '默认',
'value': value,
'domain_id': '28921413'
}
res = requests.post(records_url.format('Create'), data=data)
if res.status_code == 200 and res.json()['status']['code'] == '1':
record = res.json()['record']
records = RecordInfo()
records.sp_id = record['id']
records.name = name
records.domain_name = domain_name
records.type = record_type
records.value = value
records.updated_time = datetime.now()
records.ttl = '600'
records.use_status = True
records.monitor_status = 'unknown'
db.session.add(records)
db.session.commit()
return res.json()['status']['message']
def record_delete(record_id):
data = {
'login_token': dns_login_token,
'format': 'json',
'record_id': record_id,
'domain_id': '28921413'
}
res = requests.post(records_url.format('Remove'), data=data)
return res.json()['status']['message']
def record_modify(record_id):
pass

资产管理
关于Ansible-CMDB的介绍使用请看github

本人使用的服务器,目前主要是阿里云和腾讯云,我需要做一些服务器信息的收集,如我需要知道服务器的过期时间,公网带宽的大小等等,主要也是对服务器的信息通过api收集,之后通过datatable进行展示

以腾讯云为例API文档

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# coding: utf-8
from QcloudApi.qcloudapi import QcloudApi
import requests
def req_url_generate():
from app.utils.personal_config import tencent_secret_id, tencent_secret_key, region_lst
module = 'cvm'
action = 'DescribeInstances'
config = {
'secretId': tencent_secret_id,
'secretKey': tencent_secret_key,
'method': 'get'
}
params = {
'SignatureMethod': 'HmacSHA1',
}
req_url_lst = []
for region in region_lst:
config['Region'] = region
service = QcloudApi(module, config)
req_url_lst.append(service.generateUrl(action, params))
return req_url_lst
def parse_message_dict(dct):
message = []
for k, v in dct.items():
message.append(str(k)+': '+str(v))
return ';'.join(message)
def get_tx_vps_data():
res_lst = []
for req_url in req_url_generate():
res = requests.get(req_url, timeout=3)
result = res.json()
if result['code'] == 0:
info = result['instanceSet']
for sp in info: # info is a list
for k, v in sp.items():
if isinstance(v, list):
sp[k] = ','.join(v)
if isinstance(v, dict):
sp[k] = parse_message_dict(v)
sp['sp_name'] = 'tencent'
res_lst += info
return res_lst
def get_tx_vps_data_final():
res_lst = []
tx_data = get_tx_vps_data()
'''
tx_data_k_type_lst = ['cpu',
'wanIpSet',
'createTime',
'status',
'os',
'zoneName',
'mem',
'deadlineTime',
'lanIp'
'bandwidth',
'sp_name',]
'''
for info in tx_data:
# info = {k: _ for k, _ in info.items() if k in tx_data_k_type_lst}
res_lst.append(info)
return res_lst

命令执行
主要是使用的Ansible,对Ansible本文就不过多介绍,我使用的它的最主要的原因是十分轻量,它直接使用ssh进行操作,而不是像puppet/saltstack等其他自动化运维工具,需要在每台机器上装Agent

值得注意的是AnsibleapiAnsible的版本相关,Ansible 2.0Ansible 1.0的差距也是蛮大的,相比1.0新的版本要复杂的多

本人对Ansible 2.0 apimodule部分进行了封装,需要注意的是,目前还没有使用动态的inventory文件,之后应该会实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python
# import json, logging
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
# logging = logging.getLevelName()
class ResultCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in
If you want to collect all results into a single object for processing at
or writing your own custom callback plugin
"""
def __init__(self, *args, **kwargs):
super(ResultCallback, self).__init__(*args, **kwargs)
self.job_id = 0
self.result_host_all = {}
self.result_host_ok = {}
self.result_host_failed = {}
self.result_host_unreachable = {}
self.result_host_stdout_lines = {}
self.result_host_stderr_lines = {}
self.result_has_stderr_lines = False
self.result_has_stdout_lines = False
def v2_runner_on_ok(self, result, **kwargs):
self.job_id += 1
host = result._host.get_name() + ' job_' + str(self.job_id)
self.result_host_all[host] = result._result
self.result_host_ok[host] = result._result
if result._result.get('stdout_lines'):
self.result_has_stdout_lines = True
self.result_host_stdout_lines[host] = result._result.get('stdout_lines')
def v2_runner_on_failed(self, result, ignore_errors=False):
self.job_id += 1
host = result._host.get_name() + ' job_' + str(self.job_id)
self.result_host_all[host] = result._result
self.result_host_failed[host] = result._result
if result._result.get('stderr_lines'):
self.result_has_stderr_lines = True
self.result_host_stderr_lines[host] = result._result.get('stderr_lines')
def v2_runner_on_unreachable(self, result):
self.job_id += 1
host = result._host.get_name() + ' job_' + str(self.job_id)
self.result_host_all[host] = result._result
self.result_host_unreachable[host] = result._result
def _parse_task(task_lst):
tasks = []
if task_lst:
for task in task_lst:
module = task.get('module')
args = task.get('args')
tasks.append(dict(action=dict(module=module, args=args)))
return tasks
class AnsibleRun(object):
Options = namedtuple('Options',
['connection',
'module_path',
'forks',
'become',
'become_method',
'become_user',
'check',
'diff',
'sudo',
'timeout'])
def __init__(self, hosts, result_callback=None):
self.loader = DataLoader()
self.options = AnsibleRun.Options(connection='ssh',
module_path='../../env/lib/python3.5/site-packages/ansible/modules/',
forks=100,
sudo='yes',
become=None,
become_method=None,
become_user='root',
check=False,
diff=False,
timeout=3)
self.passwords = dict(vault_pass='secret')
self.hosts = hosts
self.inventory = InventoryManager(loader=self.loader, sources=['/etc/ansible/hosts'])
self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)
self.result_callback = result_callback if result_callback else ResultCallback()
def module_run(self, task_lst):
"""
task_lst is a list for dict, Just like :
[
{
'module': 'your_self_module',
'args': 'args=sssss'
},
{
'module': 'shell',
'args': 'ifconfig'
}
]
:param task_lst:
:return None:
"""
tasks = _parse_task(task_lst)
play_source = dict(
name="Ansible Play",
hosts=self.hosts,
gather_facts='no',
tasks=tasks
)
play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
# actually run it
tqm = None
try:
tqm = TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
passwords=self.passwords,
stdout_callback=self.result_callback,
# Use our custom callback instead of the ``default`` callback plugin
)
result = tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
def play_book_run(self):
pass
def get_result(self, result_type='all'):
params_allow_lst = ['result_all',
'result_ok',
'result_stdout_lines',
'result_stderr_lines',
'result_failed',
'result_unreachable']
assert result_type in params_allow_lst, 'result_type must in {params_allow_lst}'.format(
params_allow_lst=params_allow_lst)
if result_type == 'result_all':
return self.result_callback.result_host_all
if result_type == 'result_ok':
return self.result_callback.result_host_ok
if result_type == 'result_failed':
return self.result_callback.result_host_failed
if result_type == 'result_unreachable':
return self.result_callback.result_host_unreachable
if result_type == 'result_stdout_lines':
if self.result_callback.result_has_stdout_lines:
return self.result_callback.result_host_stdout_lines
if result_type == 'result_stderr_lines':
if self.result_callback.result_has_stderr_lines:
return self.result_callback.result_host_stderr_lines
def test():
ansible_client = AnsibleRun('localhost')
ansible_client.module_run([
# {
# 'module': 'echo',
# 'args': 'args=sssss'
# },
{
'module': 'cron',
'args': "name='just a test echo' job='echo hello world' minute='*/1'"
}
])
out = ansible_client.get_result('result_all')
print(out)
out = ansible_client.get_result('result_ok')
print(out)
out = ansible_client.get_result('result_stdout_lines')
print(out)
out = ansible_client.get_result('result_failed')
print(out)
out = ansible_client.get_result('result_stderr_lines')
print(out)
out = ansible_client.get_result('result_unreachable')
print(out)

之后在Flask的视图函数中直接调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@admin.route('/command/exec/', methods=['GET', 'POST'])
@login_req
def command_exec():
form = CommandCommitForm()
form.result.render_kw['rows'] = 10
res_count = None
if form.validate_on_submit():
host = form.data['host']
host_name_lst = [h for k, h in host_type if k in host]
command = form.data['content']
if is_sec(command):
runner = AnsibleRun(host_name_lst)
runner.module_run([
{
'module': 'shell',
'args': command
}
])
res_body = """"""
res_stdout = runner.get_result('result_stdout_lines')
std_out_count = len(res_stdout) if res_stdout else 0
if res_stdout:
res_body += '正确输出:\n'
for host_job, std in res_stdout.items():
res_body += host_job + '\n' * 2 + '\n'.join([' '*4 + v for v in std]) + '\n'
res_body += '---' * 30 + '\n'
res_stderr = runner.get_result('result_stderr_lines')
std_err_count = len(res_stderr) if res_stderr else 0
if res_stderr:
res_body += '错误输出:\n'
for host_job, std in res_stderr.items():
res_body += host_job + '\n' * 2 + '\n'.join([' '*4 + v for v in std]) + '\n'
res_body += '---' * 30 + '\n'
res_unreachable = runner.get_result('result_unreachable')
unreachable_count = len(res_unreachable) if res_unreachable else 0
if res_unreachable:
res_body += '不可达输出:\n'
for host_job, std in res_unreachable.items():
res_body += host_job + '\n' + ' ' +std['msg'] + '\n'
res_body += '---' * 30 + '\n'
form.result.data = res_body
form.result.render_kw['rows'] = 40
global res_count
res_count = (std_out_count, std_err_count, unreachable_count)
else:
flash('command not allowed', 'failed')
return render_template('admin/command_exec.html', form=form, res_count=res_count)

目前测试,在Ansible上执行批量处理,虽然页面是阻塞加载,页面响应时间在4s左右,还能接受,以后还能再优化。

完整代码见这里:My-Admin

坚持原创技术分享,您的支持将鼓励我继续创作!