Source code for cobald.monitor.format_line
from collections.abc import Mapping
from logging import Formatter, LogRecord
from typing import Dict, Set, Union, Any, TypeVar
from .format_json import RECORD_ATTRIBUTES
T = TypeVar("T")
[docs]
def escape_key(key: str) -> str:
assert isinstance(key, str)
return key.replace(r",", r"\,").replace(r"=", r"\=").replace(r" ", r"\ ")
[docs]
def escape_field(field: T) -> T:
if isinstance(field, str):
return '"' + field.replace("\\", r"\\").replace('"', r"\"") + '"'
return field
[docs]
def line_protocol(
name, tags: dict = None, fields: dict = None, timestamp: float = None
) -> str:
"""
Format a report as per InfluxDB line protocol
:param name: name of the report
:param tags: tags identifying the specific report
:param fields: measurements of the report
:param timestamp: when the measurement was taken, in **seconds** since the epoch
"""
_escape_key = escape_key
_escape_field = escape_field
output_str = name.replace(r",", r"\,").replace(r" ", r"\ ")
if tags:
output_str += ","
output_str += ",".join(
"%s=%s" % (_escape_key(key), _escape_key(value))
for key, value in sorted(tags.items())
)
output_str += " "
output_str += ",".join(
("%s=%s" % (_escape_key(key), _escape_field(value))).replace("'", '"')
for key, value in sorted(fields.items())
)
if timestamp is not None:
# line protocol requires nanosecond precision, python uses seconds
output_str += " %d" % (timestamp * 1e9)
return output_str + "\n"
[docs]
class LineProtocolFormatter(Formatter):
"""
Formatter that emits data as InfluxDB Line Protocol
:param tags: record data to use as tags
:param resolution: resolution of timestamps in seconds
The ``tags`` act as a whitelist for record keys if they are an iterable.
When a dictionary is supplied, its values act as default values if the
key is not in a record.
The ``resolution`` allows summarising data by downsampling the timestamps
to the given resolution, e.g. for a ``resolution`` of ``10`` you can expect
timestamps 10, 20, 30, ...
If ``resolution`` is ``None`` the timestamp is omitted from the Line Protocol
and Telegraf will take care on setting the current timestamp.
"""
def __init__(
self,
tags: Union[Dict[str, Any], Set[str], None] = None,
resolution: float = None,
):
super().__init__()
self._default_tags = tags if isinstance(tags, Mapping) else {}
self._tags_whitelist = set(tags) if tags is not None else set()
self._fields_blacklist = self._tags_whitelist | set(RECORD_ATTRIBUTES)
self._resolution = resolution
[docs]
def format(self, record: LogRecord) -> str:
args = record.args
if args == ({},): # logger.info('message', {}) -> record.args == ({},)
args = {}
assert isinstance(
args, Mapping
), "monitor record argument must be a mapping, not %r" % type(args)
assert all(
elem is not None for elem in args.values()
), "line protocol values must not be None"
record.asctime = self.formatTime(record, self.datefmt)
record.message = record.getMessage() if args else record.msg
tags = self._default_tags.copy()
tags.update(
{key: value for key, value in args.items() if key in self._tags_whitelist}
)
fields = {
key: value
for key, value in args.items()
if key not in self._fields_blacklist
}
timestamp = (
record.created // self._resolution * self._resolution
if self._resolution is not None
else None
)
return line_protocol(
name=record.message, tags=tags, fields=fields, timestamp=timestamp
)
if __name__ == "__main__":
import logging
logger = logging.getLogger()
logger.handlers = [logging.StreamHandler()]
logger.handlers[0].formatter = LineProtocolFormatter(
{"latitude": 49, "longitude": 8}
)
logger.warning("forecast", {"temperature": 298, "humidity": 0.45})