Анализ логов с от RPC-сервиса на NodeJS

В одной из последних версий RPC-сервиса на NodeJS реализовано хранение логов в формате совместимом с CSV (разделителем выступает табуляция \t).

Для загрузки логов потребуется вспомогательная функция load_content:

# функция загружает файлы логов с сервера с глубиной deep
def load_content(url, deep=7):
    # признак продолжения загрузки файлов
    loading = True
    # кол-во обработанных файлов
    count = 0
    # загруженный контент
    content = ''
    # размер загруженного контента в байтах
    length = 0

    while loading:
        with r.Session() as s:       
            if count == 0:
                download = s.get(url)
            else:
                download = s.get(url + '.' + str(count))

            status_code = download.status_code

            if status_code == 200:
                length += int(download.headers['Content-length'])
                
                if count == 0:
                    content = download.content.decode('utf-8')
                else:
                    content += ('\n' + download.content.decode('utf-8'))

                count += 1
                
                # принудительно завершаем загрузку
                if count >= deep:
                    loading = False
            else:
                loading = False

    return (count, length, content)

Она принимает два параметра:

  • url: String — адресная строка, где хранится лог файл
  • deep:Int — «глубина» анализа. Про структуру хранения логов можно подробнее прочитать тут

Шаг 1. Подгружаем библиотеки

import pandas as pd

import io

Шаг 2. Загружаем контент (логи)

# адрес для загрузки лог файла
CSV_URL = 'http://url.address/log-file-name.log'

# тут вызываем функцию load_content (её объявили выше)
# и загружает последние 3 файла
log_data = load_content(CSV_URL, deep=3)

# тут хранится действительное кол-во файлов, которое было загружено 
log_count = log_data[0]
# размер загруженных данных в байтах
log_size = log_data[1]
# строка с данными
log_content = log_data[2]

Шаг 3. Создаём DataFrame

# считываем данные при помощи pandas
dc = pd.read_csv(io.StringIO(log_content), sep='\t', on_bad_lines='error', 
               names=['datetime', 'type', 'category', 'message', 'user_id', 'port', 'claims', 'duration', 'size', 'name'])
# далее преобразовываем в читабельный формат
dc = dc.replace('undefined', None)

dc['port'] = pd.to_numeric(dc['port'], errors='coerce')
dc['user_id'] = pd.to_numeric(dc['user_id'], errors='coerce')
dc['datetime'] = pd.to_datetime(dc['datetime'], format='%Y-%m-%dT%H:%M:%S.%fZ', errors='coerce')
dc['datetime'] = dc['datetime'] + timedelta(hours=3)
dc['date'] = dc['datetime'].dt.date
dc['hour'] = dc['datetime'].dt.hour
dc['duration'] = pd.to_numeric(dc['duration'], errors='coerce')
dc['size'] = pd.to_numeric(dc['size'], errors='coerce')
dc = dc.dropna(subset=['datetime']).reset_index(drop=True)

При обработке файла доступна следующая информация:

  • datetime — дата и время в московском часовом поясе
  • type — тип лога: ERROR, DEBUG, LOG
  • category — категория лога
  • message — текстовая информация
  • user_id — идентификатор пользователя
  • port — локальный порт процесса
  • claims — роли пользователя
  • duration — продолжительность запроса в миллисекундах
  • size — размерность, для некоторых категорий содержит информацию в байтах, а для некоторых (RPC_PACKAGE) — размерность массива
  • name — пользовательское имя функции

Примечание: на этом можно было и закончить, но можно ещё отфильтровать действия пользователя.

Шаг 4. Действия пользователя

Достаем информацию об ответах от сервера. Для этого проверяем все строки с категориями RPC_RESPONSE, REPORT_RESPONSE, SYNC_DURATION.

  • RPC_RESPONSE — ответы от RPC
  • REPORT_RESPONSE — выгрузка отчётов
  • SYNC_DURATION — выполнение синхронизации на сервере

На основе полученной информации формируем новый DataFrame:

df = dc.loc[dc['category'].isin(['RPC_RESPONSE', 'SYNC_DURATION', 'REPORT_RESPONSE']), ['datetime', 'date', 'hour', 'name', 'category', 'duration', 'user_id', 'port', 'claims', 'size']]
df['category'] = df['category'].replace({'RPC_RESPONSE': 'rpc', 'SYNC_DURATION': 'sync', 'REPORT_RESPONSE': 'report'})

Примечание: на основе полученной информации можно отслеживать, какие запросы самые долгие, сколько они выполнялись и кем.

Пример DataFrame
Print Friendly, PDF & Email

Добавить комментарий