В одной из последних версий RPC-сервиса на NodeJS реализовано хранение логов в формате совместимом с CSV (разделителем выступает табуляция \t).
Для загрузки логов потребуется вспомогательная функция load_content:
# функция загружает файлы логов с сервера с глубиной deep def load_content(url, deep=7): # признак продолжения загрузки файлов loading = True # кол-во обработанных файлов count = 0 # загруженный контент content = '' # размер загруженного контента в байтах length = 0 while loading: with r.Session() as s: if count == 0: download = s.get(url) else: download = s.get(url + '.' + str(count)) status_code = download.status_code if status_code == 200: length += int(download.headers['Content-length']) if count == 0: content = download.content.decode('utf-8') else: content += ('\n' + download.content.decode('utf-8')) count += 1 # принудительно завершаем загрузку if count >= deep: loading = False else: loading = False return (count, length, content)
Она принимает два параметра:
- url: String — адресная строка, где хранится лог файл
- deep:Int — «глубина» анализа. Про структуру хранения логов можно подробнее прочитать тут
Шаг 1. Подгружаем библиотеки
import pandas as pd import io
Шаг 2. Загружаем контент (логи)
# адрес для загрузки лог файла CSV_URL = 'http://url.address/log-file-name.log' # тут вызываем функцию load_content (её объявили выше) # и загружает последние 3 файла log_data = load_content(CSV_URL, deep=3) # тут хранится действительное кол-во файлов, которое было загружено log_count = log_data[0] # размер загруженных данных в байтах log_size = log_data[1] # строка с данными log_content = log_data[2]
Шаг 3. Создаём DataFrame
# считываем данные при помощи pandas dc = pd.read_csv(io.StringIO(log_content), sep='\t', on_bad_lines='error', names=['datetime', 'type', 'category', 'message', 'user_id', 'port', 'claims', 'duration', 'size', 'name']) # далее преобразовываем в читабельный формат dc = dc.replace('undefined', None) dc['port'] = pd.to_numeric(dc['port'], errors='coerce') dc['user_id'] = pd.to_numeric(dc['user_id'], errors='coerce') dc['datetime'] = pd.to_datetime(dc['datetime'], format='%Y-%m-%dT%H:%M:%S.%fZ', errors='coerce') dc['datetime'] = dc['datetime'] + timedelta(hours=3) dc['date'] = dc['datetime'].dt.date dc['hour'] = dc['datetime'].dt.hour dc['duration'] = pd.to_numeric(dc['duration'], errors='coerce') dc['size'] = pd.to_numeric(dc['size'], errors='coerce') dc = dc.dropna(subset=['datetime']).reset_index(drop=True)
При обработке файла доступна следующая информация:
- datetime — дата и время в московском часовом поясе
- type — тип лога: ERROR, DEBUG, LOG
- category — категория лога
- message — текстовая информация
- user_id — идентификатор пользователя
- port — локальный порт процесса
- claims — роли пользователя
- duration — продолжительность запроса в миллисекундах
- size — размерность, для некоторых категорий содержит информацию в байтах, а для некоторых (RPC_PACKAGE) — размерность массива
- name — пользовательское имя функции
Примечание: на этом можно было и закончить, но можно ещё отфильтровать действия пользователя.
Шаг 4. Действия пользователя
Достаем информацию об ответах от сервера. Для этого проверяем все строки с категориями RPC_RESPONSE, REPORT_RESPONSE, SYNC_DURATION.
- RPC_RESPONSE — ответы от RPC
- REPORT_RESPONSE — выгрузка отчётов
- SYNC_DURATION — выполнение синхронизации на сервере
На основе полученной информации формируем новый DataFrame:
df = dc.loc[dc['category'].isin(['RPC_RESPONSE', 'SYNC_DURATION', 'REPORT_RESPONSE']), ['datetime', 'date', 'hour', 'name', 'category', 'duration', 'user_id', 'port', 'claims', 'size']] df['category'] = df['category'].replace({'RPC_RESPONSE': 'rpc', 'SYNC_DURATION': 'sync', 'REPORT_RESPONSE': 'report'})
Примечание: на основе полученной информации можно отслеживать, какие запросы самые долгие, сколько они выполнялись и кем.