Source code for opentelemetry.exporter.otlp.proto.http.metric_exporter

# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import gzip
import logging
import random
import threading
import zlib
from io import BytesIO
from os import environ
from time import time
from typing import (  # noqa: F401
    Any,
    Callable,
    Dict,
    List,
    Mapping,
    Optional,
    Sequence,
)

import requests
from requests.exceptions import ConnectionError
from typing_extensions import deprecated

from opentelemetry.exporter.otlp.proto.common._internal import (
    _get_resource_data,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
    OTLPMetricExporterMixin,
)
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
    encode_metrics,
)
from opentelemetry.exporter.otlp.proto.http import (
    _OTLP_HTTP_HEADERS,
    Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
    _is_retryable,
    _load_session_from_envvar,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (  # noqa: F401
    ExportMetricsServiceRequest,
)
from opentelemetry.proto.common.v1.common_pb2 import (  # noqa: F401
    AnyValue,
    ArrayValue,
    InstrumentationScope,
    KeyValue,
    KeyValueList,
)
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2  # noqa: F401
from opentelemetry.proto.resource.v1.resource_pb2 import Resource  # noqa: F401
from opentelemetry.proto.resource.v1.resource_pb2 import (
    Resource as PB2Resource,
)
from opentelemetry.sdk.environment_variables import (
    _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER,
    OTEL_EXPORTER_OTLP_CERTIFICATE,
    OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
    OTEL_EXPORTER_OTLP_CLIENT_KEY,
    OTEL_EXPORTER_OTLP_COMPRESSION,
    OTEL_EXPORTER_OTLP_ENDPOINT,
    OTEL_EXPORTER_OTLP_HEADERS,
    OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
    OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE,
    OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY,
    OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
    OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
    OTEL_EXPORTER_OTLP_METRICS_HEADERS,
    OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
    OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
from opentelemetry.sdk.metrics.export import (  # noqa: F401
    AggregationTemporality,
    Gauge,
    MetricExporter,
    MetricExportResult,
    MetricsData,
    Sum,
)
from opentelemetry.sdk.metrics.export import (  # noqa: F401
    Histogram as HistogramType,
)
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.util.re import parse_env_headers

_logger = logging.getLogger(__name__)


DEFAULT_COMPRESSION = Compression.NoCompression
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10  # in seconds
_MAX_RETRYS = 6


[docs] class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): def __init__( self, endpoint: str | None = None, certificate_file: str | None = None, client_key_file: str | None = None, client_certificate_file: str | None = None, headers: dict[str, str] | None = None, timeout: float | None = None, compression: Compression | None = None, session: requests.Session | None = None, preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, ): self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, _append_metrics_path( environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) ), ) self._certificate_file = certificate_file or environ.get( OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), ) self._client_key_file = client_key_file or environ.get( OTEL_EXPORTER_OTLP_METRICS_CLIENT_KEY, environ.get(OTEL_EXPORTER_OTLP_CLIENT_KEY, None), ) self._client_certificate_file = client_certificate_file or environ.get( OTEL_EXPORTER_OTLP_METRICS_CLIENT_CERTIFICATE, environ.get(OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, None), ) self._client_cert = ( (self._client_certificate_file, self._client_key_file) if self._client_certificate_file and self._client_key_file else self._client_certificate_file ) headers_string = environ.get( OTEL_EXPORTER_OTLP_METRICS_HEADERS, environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), ) self._headers = headers or parse_env_headers( headers_string, liberal=True ) self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) self._compression = compression or _compression_from_env() self._session = ( session or _load_session_from_envvar( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER ) or requests.Session() ) self._session.headers.update(self._headers) self._session.headers.update(_OTLP_HTTP_HEADERS) # let users override our defaults self._session.headers.update(self._headers) if self._compression is not Compression.NoCompression: self._session.headers.update( {"Content-Encoding": self._compression.value} ) self._common_configuration( preferred_temporality, preferred_aggregation ) self._shutdown = False def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: gzip_stream.write(serialized_data) data = gzip_data.getvalue() elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) if timeout_sec is None: timeout_sec = self._timeout # By default, keep-alive is enabled in Session's request # headers. Backends may choose to close the connection # while a post happens which causes an unhandled # exception. This try/except will retry the post on such exceptions try: resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, timeout=timeout_sec, cert=self._client_cert, ) return resp
[docs] def export( self, metrics_data: MetricsData, timeout_millis: Optional[float] = 10000, **kwargs, ) -> MetricExportResult: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return MetricExportResult.FAILURE serialized_data = encode_metrics(metrics_data).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return MetricExportResult.SUCCESS # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) if ( not _is_retryable(resp) or retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) or self._shutdown ): _logger.error( "Failed to export metrics batch code: %s, reason: %s", resp.status_code, resp.text, ) return MetricExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", resp.reason, backoff_seconds, ) shutdown = self._shutdown_in_progress.wait(backoff_seconds) if shutdown: _logger.warning("Shutdown in progress, aborting retry.") break return MetricExportResult.FAILURE
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") return self._shutdown = True self._shutdown_in_progress.set() self._session.close()
@property def _exporting(self) -> str: return "metrics"
[docs] def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True
[docs] @deprecated( "Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead. Deprecated since version 1.18.0.", ) def get_resource_data( sdk_resource_scope_data: Dict[SDKResource, Any], # ResourceDataT? resource_class: Callable[..., PB2Resource], name: str, ) -> List[PB2Resource]: return _get_resource_data(sdk_resource_scope_data, resource_class, name)
def _compression_from_env() -> Compression: compression = ( environ.get( OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), ) .lower() .strip() ) return Compression(compression) def _append_metrics_path(endpoint: str) -> str: if endpoint.endswith("/"): return endpoint + DEFAULT_METRICS_EXPORT_PATH return endpoint + f"/{DEFAULT_METRICS_EXPORT_PATH}"