docker-compose部署msyql主从同步,mysql-master,mysql-slave,redis,elasticsearch
项目服务:
elasticsearch:用于存储系统日志,在kibana上可视化;mysql数据同步es ,用于app上filter查询
系统日志:1.存储在es 2.存储在服务器log文件
aws上es管理:https://ap-south-1.console.aws.amazon.com/es/home?region=ap-south-1
kibana地址:https://search-******-d4wrxzi5nw5xlaboupwtl26aka.ap-south-1.es.amazonaws.com/_plugin/kibana/
定期清理一个月前的日志:(es的具体用法详见有道云笔记)
def es_log_clean(): delete_query = { "query": { "range": { "now": { "lt": "now-30d/d" } } } } res = es.delete_by_query(index=index_name, body=delete_query) print(res) 或者: POST /pro_api_log/_delete_by_query { "query": { "range" : { "now" : { "lt" : "now-30d/d" } } } }
服务器日志(按照天来切分):
def init_logger(logger_name, config, spe_name=None): # [%(pathname)s:%(lineno)d] formatter = logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s") # create a logger logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(formatter) # File Handler if spe_name and isinstance(spe_name, str): file_handler = logging.handlers.TimedRotatingFileHandler(spe_name, 'D', 1, 10, 'UTF-8') else: file_handler = logging.handlers.TimedRotatingFileHandler('/var/log/wemore.log', 'D', 1, 30, 'UTF-8') file_handler.setLevel(config.log_level) file_handler.setFormatter(formatter) # email_handler = logging.handlers.SMTPHandler( # (config.email["Server"], config.email["Port"]), # config.email["From"], # config.email["To"], # config.email["Subject"], # credentials=(config.email["From"], config.email["Credit"]) # ) # email_handler.setLevel(config.email['Level']) # 给logger添加Handler # logger.addHandler(console_handler) logger.addHandler(file_handler) # logger.addHandler(email_handler) return logger log = init_logger('flask_app', conf)
mysql数据同步到es:实现原理:python-mysql-replication是基于MySQL复制原理实现的,把自己伪装成一个slave不断的从MySQL数据库获取binlog并解析。
from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import * from pymysqlreplication.event import RotateEvent
因为mysql-master,mysql-slave的server_id为1,2,所以设置为3
# 如果是同步集群的binlog日志,这里的server_id不能与集群中的server_id相同 stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=3, blocking=True, only_events=only_events, only_schemas=only_schemas, only_tables=only_tables, log_file=log_file, log_pos=log_pos, resume_stream=True) # log_file与log_pos参数只有在resume_stream为True的时候才有效
for binlogevent in stream: ct = (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))) print("%s : --%s--" % (ct, binlogevent.__class__.__name__), flush=True) print(binlogevent.dump()) handler = None if binlogevent.__class__.__name__ == "UpdateRowsEvent": handler = UpdateRowsEventHandler(binlogevent.table, 'update', binlogevent.rows) elif binlogevent.__class__.__name__ == "WriteRowsEvent": handler = WriteRowsEventHandler(binlogevent.table, 'write', binlogevent.rows) elif binlogevent.__class__.__name__ == "DeleteRowsEvent": handler = DeleteRowsEventHandler(binlogevent.table, 'delete', binlogevent.rows) else: NotImplemented if not handler: continue
本文为Davidvivi原创文章,转载无需和我联系,但请注明来自Davidvivi博客weixia.xin 本人微信:ww646904527,备注博客