Ajustar la escala de los clústeres de Dataproc de forma automática

¿Qué es el ajuste de escala automático?

Es difícil calcular la cantidad “adecuada” trabajadores (nodos) del clúster de una carga de trabajo, un tamaño único del clúster para toda la canalización no suele ser lo ideal. El Escalamiento de clústeres iniciado por parte del usuario aborda de forma parcial este desafío, pero requiere la supervisión del uso y la intervención manual del clúster.

La API de AutoscalingPolicies de Dataproc proporciona un mecanismo para automatizar la administración de recursos del clúster y habilitar el ajuste de escala automático de la VM de trabajador del clúster. Una Autoscaling Policy es una configuración reutilizable que describe cómo se deben escalar los trabajadores del clúster que usan la política de ajuste de escala automático. Define los límites de escalamiento, frecuencia y agresividad para proporcionar un control detallado sobre los recursos de los clústeres a lo largo de su ciclo de vida.

Cuándo usar el ajuste de escala automático

Usa el ajuste de escala automático:

en clústeres que almacenan datos en servicios externos, como Cloud Storage o BigQuery

en clústeres que procesan muchos trabajos

para escalar clústeres de un solo trabajo

con el modo de flexibilidad mejorada para los trabajos por lotes de Spark

El ajuste de escala automático no se recomienda para o con:

  • HDFS: El ajuste de escala automático no se diseñó para escalar HDFS en el clúster:

    1. El uso de HDFS no es un indicador para el ajuste de escala automático.
    2. Los datos HDFS solo se alojan en trabajadores principales. La cantidad de trabajadores principales debe ser suficiente para alojar todos los datos de HDFS.
    3. El retiro de DataNodes de HDFS puede retrasar la eliminación de trabajadores. Los datanodes copian bloques de HDFS a otros DataNodes antes de quitar un trabajador. Según el tamaño de los datos y el factor de replicación, este proceso puede tomar horas.
  • Etiquetas de nodos de YARN: el ajuste de escala automático no es compatible con etiquetas de nodo de YARN, ni la propiedad dataproc:am.primary_only debido a YARN-9088. YARN informa de manera incorrecta las métricas del clúster cuando se usan etiquetas de nodo.

  • Transmisión estructurada de Spark: El ajuste de escala automático no es compatible Transmisión estructurada de Spark (consulta Ajuste de escala automático y transmisión estructurada de Spark).

  • Clústeres inactivos: No se recomienda el ajuste de escala automático para reducir el tamaño de un clúster al tamaño mínimo cuando este está inactivo. Dado que la creación de un clúster nuevo es tan rápida como cambiar el tamaño de uno, considera borrar los clústeres inactivos y volver a crearlos. Las siguientes herramientas son compatibles con este modelo “efímero”:

    Usa los flujos de trabajo de Dataproc para programar un conjunto de trabajos en un clúster dedicado y, luego, borra el clúster cuando finalicen los trabajos. Para una organización más avanzada, usa Cloud Composer, que se basa en Apache Airflow.

    Para clústeres que procesan consultas ad hoc o cargas de trabajo programadas externamente, usa la eliminación programada de clústeres para borrar el clúster después de un período o duración de inactividad que se especificó, o en un momento en particular.

  • Cargas de trabajo de diferentes tamaños: Cuando los trabajos pequeños y grandes se ejecutan en un clúster, el retiro de servicio ordenado y la reducción de escala esperará a que finalicen los trabajos grandes. Un trabajo de larga duración retrasará el ajuste de escala automático recursos para trabajos más pequeños que se ejecutan en el clúster hasta que el trabajo de larga duración para finalizar la tarea. Para evitar este resultado, agrupa trabajos más pequeños de tamaño similar en un clúster y aislar cada trabajo de larga duración en un clúster independiente.

Habilita el ajuste de escala automático

Para habilitar el ajuste de escala automático en un clúster, haz lo siguiente:

  1. Crea una política de ajuste de escala automático

  2. Realiza una de las siguientes acciones:

    1. Crea un clúster de ajuste de escala automático
    2. Habilita el ajuste de escala automático en un clúster existente.

Crea una política de ajuste de escala automático

Comando de gcloud

Puedes usar el comando gcloud dataproc autoscaling-policies import para crear una política de ajuste de escala automático. Lee un archivo local YAML que define una política de ajuste de escala automático. El formato y el contenido del archivo deben coincidir con los objetos de configuración y los campos que define la API de REST autoscalingPolicies.

El siguiente ejemplo de YAML define una política que especifica todos los campos obligatorios. También proporciona laminInstances ymaxInstances los valores de los trabajadores principales,maxInstances valor para los trabajadores secundarios (interrumpibles) y especifica un plazo de 4 minutoscooldownPeriod (el valor predeterminado es de 2 minutos). workerConfig configura los trabajadores principales. En este ejemplo, minInstances y maxInstances se configuran con el mismo valor para evitar escalar los trabajadores principales.

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Este es otro ejemplo de YAML que especifica todos los campos opcionales y obligatorios de la política de ajuste de escala automático.

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell para crear la política de ajuste de escala automático. Proporciona un nombre para la política. Este nombre se convertirá en el id de la política, que puedes usar en comandos de gcloud posteriores para hacer referencia a la política. Usa la marca --source a fin de especificar la ruta de acceso del archivo local y el nombre de archivo del archivo YAML de la política de ajuste de escala automático para importar.

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

API de REST

Para crear una política de ajuste de escala automático, define una AutoscalingPolicy como parte de una solicitud autoscalingPolicies.create.

Console

Para crear una política de ajuste de escala automático, selecciona CREATE POLICY del Políticas de ajuste de escala automático con la consola de Google Cloud. En la página Crear política, puedes seleccionar un panel de recomendación de política a fin de propagar los campos de la política de ajuste de escala automático para un tipo de trabajo o un objetivo de escalamiento específicos.

Crea un clúster de ajuste de escala automático

Después de crear una política de ajuste de escala automático, crea un clúster que use esta política. El clúster debe estar en la misma región que la política de ajuste de escala automático.

Comando de gcloud

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell para crear un clúster de ajuste de escala automático. Proporciona un nombre para el clúster y usa la marca --autoscaling-policy a fin de especificar el policy id (el nombre de la política que especificaste cuando creaste la política) o la política resource URI (resource name) (consulta los campos id y name de AutoscalingPolicy ).

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API de REST

Crea un clúster de ajuste de escala automático mediante la inclusión de AutoscalingConfig como parte de una solicitud clusters.create.

Console

Puedes seleccionar una política de ajuste de escala automático existente para aplicarla a un clúster nuevo en la sección Política de ajuste de escala automático del clúster Configura en el panel de Dataproc Crea un clúster de la consola de Google Cloud.

Habilita el ajuste de escala automático en un clúster existente

Después de crear una política de ajuste de escala automático, puedes habilitarla en un clúster existente en la misma región.

Comando de gcloud

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell a fin de habilitar una política de ajuste de escala automático en un clúster existente. Proporciona el nombre del clúster y usa la marca --autoscaling-policy para especificar el policy id (el nombre de la política que especificaste cuando creaste la política) o la política resource URI (resource name) (consulta los campos id y name de AutoscalingPolicy).

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API de REST

Para habilitar una política de ajuste de escala automático en un clúster existente, configura el AutoscalingConfig.policyUri de la política en el updateMask de una solicitud clusters.patch.

Console

Actualmente, habilitar una política de ajuste de escala automático en un clúster existente compatibles con la consola de Google Cloud.

Uso de la política en varios clústeres

  • Una política de ajuste de escala automático define el comportamiento de escalamiento que se puede aplicar a varios clústeres. Una política de ajuste de escala automático se aplica mejor en varios clústeres cuando estos compartan cargas de trabajo similares o ejecuten trabajos con patrones de uso de recursos similares.

  • Puedes actualizar una política que varios clústeres usan. Las actualizaciones afectan de inmediato el comportamiento del ajuste de escala automático de todos los clústeres que usan la política (consulta autoscalingPolicies.update). Si no deseas que una actualización de la política se aplique a un clúster que usa la política, inhabilita el ajuste de escala automático en el clúster antes de actualizarla.

Comando de gcloud

Ejecuta el comando de gcloud siguiente desde una terminal local o en Cloud Shell para inhabilitar el ajuste de escala automático en un clúster.

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

API de REST

Para inhabilitar el ajuste de escala automático en un clúster, establece AutoscalingConfig.policyUri en la string vacía y establece update_mask=config.autoscaling_config.policy_uri en una solicitud clusters.patch.

Console

Actualmente, no se admite inhabilitar el ajuste de escala automático en un clúster en Consola de Google Cloud

Cómo funciona el ajuste de escala automático

El ajuste de escala automático verifica las métricas de Hadoop YARN del clúster a medida que transcurre el período de enfriamiento para determinar si se debe escalar el clúster y, si es así, la magnitud de la actualización.

  1. Valor de la métrica de recursos pendientes de YARN (memoria pendiente o núcleos pendientes) determina si debe aumentar o reducir la escala verticalmente. Un valor superior a 0 indica que los trabajos YARN están esperando recursos y que puede ser necesario escalar verticalmente. Un valor 0 indica YARN tiene recursos suficientes para que una reducción de la escala u otros cambios que podrían no ser obligatorias.

    Si el recurso pendiente es > 0)

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Pending + Available + Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    Si el recurso pendiente es 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]

    De forma predeterminada, el escalador automático supervisa el recurso de memoria YARN. Si Habilitas el ajuste de escala automático basado en núcleos, se supervisan la memoria YARN y los núcleos YARN: estimated_worker_count se evalúa por separado para memoria y núcleos, y se selecciona el mayor número de trabajadores resultante.

    $estimated\_worker\_count =$

    \[ max(estimated\_worker\_count\_by\_memory,\ estimated\_worker\_count\_by\_cores) \]

    \[ estimated\ \Delta worker = estimated\_worker\_count - current\_worker\_count \]

  2. Dado el cambio estimado necesario en la cantidad de trabajadores, el ajuste de escala automático utiliza un scaleUpFactor o scaleDownFactor para calcular el cambio real en cantidad de trabajadores:

    if estimated Δworkers > 0:
      actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    Un scaleUpFactor o scaleDownFactor de 1.0 significa que el ajuste de escala automático escalará para que recurso pendiente/disponible es 0 (uso perfecto).

  3. Una vez que se calcula el cambio en la cantidad de trabajadores, scaleUpMinWorkerFraction y scaleDownMinWorkerFraction actúan como para determinar si el ajuste de escala automático escalará el clúster. Una fracción pequeña significa que el ajuste de escala automático debe escalar incluso si el Δworkers es pequeño. Una fracción mayor significa que el escalamiento solo debe suceder cuando el Δworkers es grande.

    IF (Δworkers >  scaleUpMinWorkerFraction * current_worker_count) then scale up
    
    O BIEN
    IF (abs(Δworkers) >  scaleDownMinWorkerFraction * current_worker_count),
    THEN scale down.
    

  4. Si la cantidad de trabajadores a fin de escalar es bastante grande para activar el escalamiento, el ajuste de escala automático usa los límitesminInstances y maxInstances de workerConfig y secondaryWorkerConfig y weight (proporción de trabajadores principales a secundarios) a fin de determinar cómo dividir la cantidad de trabajadores en los grupos de instancias de trabajadores principales y secundarios. El resultado de estos cálculos es el cambio de ajuste de escala automático final en el clúster para el período de escalamiento.

  5. Las solicitudes de reducción de escala automático se cancelarán en los clústeres creados con la imagen versiones posteriores a 2.0.57 y 2.1.5 si:

    1. hay una reducción de escala en curso con un tiempo de espera de retiro de servicio ordenado distinto de cero valor
    2. la cantidad de trabajadores YARN ACTIVOS (“trabajadores activos”) más el cambio en la cantidad total de trabajadores que recomienda el escalador automático La métrica (Δworkers) es igual o mayor que DECOMMISSIONING trabajadores YARN (“retiro de servicio de trabajo”), como se muestra en la siguiente fórmula:

      IF (active workers + Δworkers ≥ active workers + decommissioning workers)
      THEN cancel the scaledown operation
      

    Para ver un ejemplo de cancelación de reducción de escala, consulta ¿Cuándo el ajuste de escala automático cancela una operación de reducción de escala?

Recomendaciones de configuración del ajuste de escala automático

Evita escalar trabajadores principales

Los trabajadores principales ejecutan Datanodes de HDFS, mientras que los trabajadores secundarios solo ejecutan el procesamiento. El uso de trabajadores secundarios te permite escalar de forma eficiente los recursos de procesamiento sin la necesidad de aprovisionar almacenamiento, lo que da como resultado capacidades de escalamiento más rápido. Los namenodes de HDFS pueden tener varias condiciones de carrera que hacen que HDFS se dañe, de modo que el retiro de servicio queda atascado indefinidamente. Para evitar este problema, evitar escalar los trabajadores principales. Por ejemplo: workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: minInstances: 0 maxInstances: 100

Hay algunas modificaciones que se deben realizar en el comando de creación del clúster:

  1. Configura --num-workers=10 para que coincida con el tamaño del grupo de trabajadores principales de la política de ajuste de escala automático.
  2. Configura --secondary-worker-type=non-preemptible para configurar los trabajadores secundarios a fin de que no sean interrumpibles. A menos que se deseen las VM interrumpibles.
  3. Copia la configuración de hardware de los trabajadores principales a los secundarios. Por ejemplo, configura --secondary-worker-boot-disk-size=1000GB para que coincida con --worker-boot-disk-size=1000GB.

Usa el modo de flexibilidad mejorada para los trabajos por lotes de Spark

Usar el Modo de flexibilidad mejorada (EFM) con el ajuste de escala automático para lo siguiente:

permitir una reducción más rápida de la escala del clúster mientras se ejecutan los trabajos

evitar la interrupción de los trabajos en ejecución debido para reducir la escala verticalmente del clúster

para minimizar la interrupción de los trabajos en ejecución debido a la interrupción de los trabajadores secundarios interrumpibles

Con EFM habilitado, el tiempo de espera del retiro de servicio ordenado de una política de ajuste de escala automático debe establecerse en 0s. La política de ajuste de escala automático solo debe realizar un ajuste de escala automático de trabajadores secundarios.

Elige un tiempo de espera de retiro de servicio ordenado

El ajuste de escala automático admite Retiro de servicio ordenado de YARN cuando se quitan nodos de un clúster. El retiro de servicio ordenado permite que las aplicaciones terminen de reproducir aleatoriamente datos entre etapas para evitar volver a establecer la configuración el progreso del trabajo. El ⁠tiempo de espera de retiro de servicio ordenado que se proporciona en una política de ajuste de escala automático es el límite superior de duración que esperará YARN para aplicaciones en ejecución (aplicaciones que se ejecutaban cuando se inició el retiro) antes de quitar los nodos.

Cuando un proceso no se completa dentro del tiempo de espera de retiro de servicio ordenado especificado, del nodo trabajador se cierra de manera forzosa, lo que puede provocar la pérdida de datos interrupción del servicio. Para evitar esta posibilidad, establece el tiempo de espera del retiro de servicio ordenado en un valor mayor que el el trabajo más largo que procesará el clúster. Por ejemplo, si esperas que que se ejecute el trabajo durante una hora, establece el tiempo de espera en al menos una hora (1h).

Considera migrar trabajos que tarden más de 1 hora en sus propios clústeres efímeros para evitar bloquear el retiro de servicio ordenado.

Configura scaleUpFactor

scaleUpFactor controla la agresividad con la que el escalador automático escala verticalmente un clúster. Especifica un número entre 0.0 y 1.0 para establecer el valor fraccionario de un recurso pendiente de YARN que provoca la adición de nodos.

Por ejemplo, si hay 100 contenedores pendientes que solicitan 512 MB cada uno, hay 50 GB de memoria YARN pendiente. Si scaleUpFactor es 0.5, el escalador automático agregará suficientes nodos para agregar 25 GB de memoria YARN. De forma similar, si es 0.1, el escalador automático agregará suficientes nodos por 5 GB. Ten en cuenta que estos valores corresponden a la memoria YARN, no a la memoria total disponible en una VM.

Un buen punto de partida es 0.05 para los trabajos de MapReduce y los trabajos de Spark con la asignación dinámica habilitada. Para los trabajos de Spark con un recuento de ejecutor fijo y trabajos de Tez, usa 1.0. Un scaleUpFactor de 1.0 significa que el ajuste de escala automático escalará para que el recurso pendiente/disponible es 0 (uso perfecto).

Configura scaleDownFactor

scaleDownFactor controla la agresividad con la que el escalador automático disminuye un clúster. Especifica un número entre 0.0 y 1.0 para establecer el valor fraccionario del recurso disponible de YARN que provoca la eliminación del nodo.

Deja este valor en 1.0 para la mayoría de los clústeres de varios trabajos que necesitan escalar verticalmente y con frecuencia. Como resultado del retiro de servicio ordenado, las operaciones de reducción de escala mucho más lento que las operaciones de escalamiento vertical. Establece scaleDownFactor=1.0 conjuntos una tasa de reducción de escala agresiva, que minimiza la cantidad de operaciones de reducción de escala necesarias para lograr el tamaño de clúster adecuado.

En el caso de los clústeres que necesitan más estabilidad, establece un scaleDownFactor más bajo para un y reducir la escala verticalmente.

Establece este valor en 0.0 para evitar reducir la escala verticalmente del clúster, por ejemplo, cuando con clústeres efímeros o de un solo trabajo.

Configura scaleUpMinWorkerFraction y scaleDownMinWorkerFraction

Se usan scaleUpMinWorkerFraction y scaleDownMinWorkerFraction con scaleUpFactor o scaleDownFactor y tienen valores predeterminados valores de 0.0. Representan los umbrales en los que el escalador automático aumentar o reducir verticalmente la escala del clúster: el aumento mínimo del valor fraccionario o disminuir el tamaño del clúster necesario para emitir solicitudes de escalamiento vertical o reducción.

Ejemplos: el escalador automático no emitirá una solicitud de actualización para agregar 5 trabajadores a un Clúster de 100 nodos, a menos que scaleUpMinWorkerFraction sea menor o igual que 0.05 (5%). Si se configura como 0.1, el escalador automático no emitirá la solicitud para escalar verticalmente el clúster. De manera similar, si scaleDownMinWorkerFraction es 0.05, el escalador automático no emitir una solicitud de actualización para quitar nodos de un clúster de 100 nodos, se deben quitar al menos 5 nodos.

El valor predeterminado de 0.0 significa que no hay umbral.

Configuración más alta scaleDownMinWorkerFractionthresholds en clústeres grandes (más de 100 nodos) para evitar operaciones de escalamiento pequeñas e innecesarias muy recomendada.

Elige un período de enfriamiento

cooldownPeriod establece un período durante el cual el escalador automático no emitirá solicitudes para cambiar el tamaño del clúster. Puedes usarlo para limitar la frecuencia de los cambios del escalador automático al tamaño del clúster.

El cooldownPeriod mínimo y predeterminado es de dos minutos. Si se configura un cooldownPeriod más corto en una política, los cambios en la carga de trabajo afectarán más rápido el tamaño del clúster, pero los clústeres podrían escalarse verticalmente o reducirse de forma innecesaria. Se recomienda configurar el scaleUpMinWorkerFraction y scaleDownMinWorkerFraction de una política en un valor distinto de cero cuando uses un cooldownPeriod más corto. Esto garantiza que el clúster solo aumenta o reduce la escala verticalmente cuando el cambio en el uso de recursos es suficientes para garantizar una actualización del clúster.

Si tu carga de trabajo es sensible a los cambios en el tamaño del clúster, puedes aumentar el período de inactividad. Por ejemplo, si ejecutas un lote procesamiento, puedes establecer el período de inactividad en 10 minutos o más. Experimentar con diferentes períodos de inactividad para encontrar el valor que funcione mejor para la carga de trabajo.

Límites de recuento de trabajadores y pesos de grupo

Cada grupo de trabajadores tiene minInstances y maxInstances que configuran un límite estricto en cuanto al tamaño de cada grupo.

Cada grupo también tiene un parámetro llamado weight que configura el balanceo de destino entre los dos grupos. Ten en cuenta que este parámetro es solo una sugerencia y, si un grupo alcanza su tamaño mínimo o máximo, los nodos solo se agregarán o quitarán del otro grupo. Por lo tanto, weight siempre se puede dejar en el valor predeterminado 1.

Habilita el ajuste de escala automático basado en núcleos

De forma predeterminada, YARN usa métricas de memoria para la asignación de recursos. En las aplicaciones con uso intensivo de CPU, se recomienda configurar YARN para que use el Calculadora de recursos dominantes. Para hacer esto, configura la siguiente propiedad cuando crees un clúster:

capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

Ajuste de escala automático de métricas y registros

Los siguientes recursos y herramientas pueden ayudarte a supervisar las operaciones de ajuste de escala automático y su efecto en tu clúster y sus trabajos.

Cloud Monitoring

Usa Cloud Monitoring para hacer lo siguiente:

  • Visualizar las métricas que usa el ajuste de escala automático
  • ver la cantidad de administradores de nodos en tu clúster
  • comprender por qué el ajuste de escala automático escaló o no tu clúster autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Cloud Logging

Usa Cloud Logging para ver los registros del escalador automático de Cloud Dataproc.

1) Busca registros para tu clúster.

autoscaling-logs-for-cluster

2) Selecciona dataproc.googleapis.com/autoscaler.

autoscaling-log-file

3) Expande los mensajes del registro para ver el campo status. Los registros están en JSON, un formato procesable.

autoscaling-three-logs autoscaling-update-operation

4) Expande el mensaje de registro para ver las recomendaciones de escalamiento, las métricas utilizadas a fin de tomar decisiones de escalamiento, el tamaño original del clúster y el tamaño nuevo del clúster de destino.

autoscaling-recommendation-message

Segundo plano: ajuste de escala automático con Apache Hadoop y Apache Spark

En las secciones a continuación, se analiza cómo interopera el ajuste de escala automático (o no) con Hadoop YARN y Hadoop Mapreduce, y con Apache Spark, Spark Streaming y Spark Structured Streaming.

Métricas de Hadoop YARN

El ajuste de escala automático se centra en las siguientes métricas de YARN de Hadoop:

  1. Allocated resource hace referencia al recurso total de YARN que ocupa la ejecución contenedores en todo el clúster. Si hay 6 contenedores en ejecución que puedes usar hasta 1 unidad de recurso, hay 6 recursos asignados.

  2. Available resource es un recurso de YARN en el clúster que no usa contenedores. Si hay 10 unidades de recursos en todos los administradores de nodos y 6 se asignan, hay 4 recursos disponibles. Si hay disponibles (sin usar) del clúster, el ajuste de escala automático puede quitar trabajadores del clúster.

  3. Pending resource es la suma de las solicitudes de recursos de YARN para contenedores pendientes. Los contenedores pendientes esperan a que se ejecute el espacio en YARN. El recurso pendiente es distinto de cero solo si el recurso disponible es cero o demasiado pequeño para asignarlo al siguiente contenedor. Si hay contenedores pendientes, el ajuste de escala automático puede agregar trabajadores al clúster.

Puedes ver estas métricas en Cloud Monitoring. Como configuración predeterminada, la memoria YARN será 0.8 * memoria total en el clúster, con memoria restante reservada para otros daemons y uso del sistema operativo, como la caché de la página. Puedes anular el valor predeterminado con la configuración de YARN “yarn.nodemanager.resource.memory-mb” (consulta Apache Hadoop YARN, HDFS, Spark y propiedades relacionadas).

Ajuste de escala automático y MapReduce de Hadoop

MapReduce ejecuta cada asignación y reduce la tarea como un contenedor de YARN independiente. Cuando se inicia un trabajo, MapReduce envía solicitudes de contenedor a cada tarea de asignación, lo que da como resultado un gran aumento en la memoria YARN. A medida que finalizan las tareas de asignación, la memoria pendiente disminuye.

Cuando mapreduce.job.reduce.slowstart.completedmaps se completa (el 95% de forma predeterminada en Dataproc), MapReduce agrega a la cola las solicitudes de contenedor para todos los reductores, lo que da como resultado otro aumento en la memoria pendiente.

A menos que las tareas de reducción y el mapeo tarden varios minutos o más, no establezcas un valor alto para el ajuste de escala automático scaleUpFactor. Agregar trabajadores al clúster tarda al menos 1.5 minutos, así que asegúrate de que haya suficiente trabajo pendiente para usar uno nuevo durante varios minutos. Un buen punto de partida es establecer scaleUpFactor en 0.05 (5%) o 0.1 (10%) de memoria pendiente.

Ajuste de escala automático y Spark

Spark agrega una capa adicional de programación sobre YARN. En particular, la asignación dinámica de Spark Core realiza solicitudes a YARN para contenedores que ejecuten los ejecutores de Spark y, luego, programa las tareas de Spark en los subprocesos de esos ejecutores. Los clústeres de Dataproc habilitan la asignación dinámica de forma predeterminada, por lo que los ejecutores se agregan y quitan según sea necesario.

Spark siempre solicita YARN para contenedores, pero sin la asignación dinámica, solo requiere contenedores al comienzo del trabajo. Con la asignación dinámica, quitará contenedores o solicitará nuevos, según sea necesario.

Spark comienza desde una pequeña cantidad de ejecutores (2 en clústeres de ajuste de escala automático) y continúa duplicando la cantidad de ejecutores mientras hay tareas pendientes. Esto reduce la memoria pendiente (menos aumentos repentinos de memoria pendiente). Se recomienda configurar el ajuste de escala automático scaleUpFactor en un número mayor, como 1.0 (100%), para los trabajos de Spark.

Inhabilita la asignación dinámica de Spark

Si ejecutas trabajos de Spark independientes que no se benefician de la asignación dinámica de Spark, puedes inhabilitar spark.dynamicAllocation.enabled=false y configurar spark.executor.instances. Aún puedes usar el ajuste de escala automático para aumentar o reducir el escalamiento de los clústeres mientras se ejecutan los trabajos de Spark independientes.

Trabajos de Spark con datos almacenados en caché

Configura spark.dynamicAllocation.cachedExecutorIdleTimeout o los conjuntos de datos que no están almacenados en caché cuando ya no sean necesarios. De forma predeterminada, Spark no quita los ejecutores que tienen datos almacenados en caché, lo que evitaría disminuir la escala del clúster.

Ajuste de escala automático y transmisión de Spark

  1. Dado que Spark Streaming tiene su propia versión de asignación dinámica que usa indicadores específicos de la transmisión para agregar y quitar ejecutores, configura spark.streaming.dynamicAllocation.enabled=true y inhabilitar la asignación dinámica de Spark Core configurando spark.dynamicAllocation.enabled=false.

  2. No uses el retiro de servicio ordenado (ajuste de escala automático de gracefulDecommissionTimeout) con trabajos de Spark Streaming. En su lugar, para quitar trabajadores de forma segura con el ajuste de escala automático, configura el punto de control para obtener la tolerancia a errores.

Como alternativa, para usar Spark Streaming sin ajuste de escala automático, haz lo siguiente:

  1. Inhabilita la asignación dinámica de Spark Core (spark.dynamicAllocation.enabled=false)
  2. Configura la cantidad de ejecutores (spark.executor.instances) de tu trabajo. Consulta Propiedades del clúster.

Ajuste de escala automático y Spark Structured Streaming

El ajuste de escala automático no es compatible con Spark Structured Streaming porque este no admite la asignación dinámica (consulta SPARK-24815: Structured Streaming debe admitir la asignación dinámica).

Controla el ajuste de escala automático a través de particiones y paralelismo

Si bien el paralelismo suele establecerse o determinarse en función de los recursos del clúster (por ejemplo, la cantidad de bloques de HDFS controla la cantidad de tareas), con el ajuste de escala automático sucede lo contrario: el ajuste de escala automático establece la cantidad de trabajadores según el paralelismo de trabajo. A continuación, se incluyen lineamientos que te ayudarán a configurar el paralelismo de trabajos:

  • Si bien Dataproc establece la cantidad predeterminada de MapReduce, reduce las tareas según el tamaño inicial del clúster de tu clúster, puedes configurar mapreduce.job.reduces para aumentar el paralelismo de la fase de reducción.
  • El paralelismo de Spark SQL y Dataframe está determinado por spark.sql.shuffle.partitions, que se establece de forma predeterminada en 200.
  • El valor predeterminado de las funciones RDD de Spark es spark.default.parallelism, que se establece en la cantidad de núcleos en los nodos trabajadores cuando se inicia el trabajo. Sin embargo, todos los RDD que crean Shuffles toman un parámetro para la cantidad de particiones, lo que anula spark.default.parallelism.

Debes asegurarte de que tus datos estén particionados de forma uniforme. Si existe una desviación significativa de la clave, una o más tareas pueden tardar mucho más tiempo que otras, lo que genera un uso bajo.

Ajuste de escala automático de la configuración predeterminada de las propiedades de Spark y Hadoop

Los clústeres de ajuste de escala automático tienen valores de propiedad de clúster predeterminados que ayudan a evitar fallas de trabajo cuando se quitan los trabajadores principales o se interrumpen los trabajadores secundarios. Puedes anular estos valores predeterminados cuando creas un clúster con ajuste de escala automático (consulta Propiedades del clúster).

Parámetros predeterminados de configuración para aumentar la cantidad máxima de reintentos de tareas, instancias principales y etapas:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Parámetros predeterminados de configuración para restablecer los contadores de reintentos (útil para trabajos de Spark Streaming de larga duración):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Configuración predeterminada para que el mecanismo de asignación dinámica de inicio lento de Spark comience desde un tamaño pequeño:

spark:spark.executor.instances=2

Preguntas frecuentes

¿Se puede habilitar el ajuste de escala automático en clústeres de alta disponibilidad y de un solo nodo?

El ajuste de escala automático se puede habilitar en clústeres de alta disponibilidad, pero no en clústeres de un solo nodo (los clústeres de un solo nodo no admiten el cambio de tamaño).

¿Puedes cambiar el tamaño de un clúster de ajuste de escala automático de forma manual?

Sí. Puedes decidir cambiar el tamaño de un clúster de forma manual como una medida de detención cuando se modifica una política de ajuste de escala automático. Sin embargo, estos cambios solo tendrán un efecto temporal, y el ajuste de escala automático escalará el clúster luego.

En lugar de cambiar el tamaño de un clúster de ajuste de escala automático de forma manual, considera lo siguiente:

Actualiza la política de ajuste de escala automático Cualquier cambio que se realice en la política de ajuste de escala automático afectará a todos los clústeres que usen la política en este momento (consulta Uso de la política en varios clústeres).

Desconecta la política y escala de forma manual el clúster hasta alcanzar el tamaño preferido.

Obtén asistencia de Dataproc.

¿En qué se diferencia Dataproc del ajuste de escala automático de Dataflow?

Consulta Ajuste de escala automático horizontal de Dataflow. y Ajuste de escala automático vertical de Dataflow Prime.

¿El equipo de desarrollo de Dataproc puede restablecer el estado de un clúster de ERROR a RUNNING?

En general, no. Esto requiere un esfuerzo manual para verificar si es seguro restablecer el estado del clúster y, a menudo, un clúster no se puede restablecer sin otros pasos manuales, como reiniciar el Namenode de HDFS.

Dataproc establece el estado de un clúster en ERROR cuando no puede determinar el estado de un clúster después de una operación con errores. Clústeres en ERROR no tienen ajuste de escala automático o no ejecutan trabajos. Entre las causas comunes, se incluyen las siguientes:

  1. Errores que se muestran desde la API de Compute Engine, a menudo durante las interrupciones de Compute Engine.

  2. HDFS entra en un estado dañado debido a errores en el retiro de servicio de HDFS.

  3. Errores de la API de control de Dataproc, como “Venció la asignación de tiempo de tareas”

Borra y vuelve a crear los clústeres cuyo estado sea ERROR.

¿Cuándo el ajuste de escala automático cancela una operación de reducción de escala?

El siguiente gráfico es una ilustración que muestra cuándo cancelar una operación de reducción de escala (consulta también Cómo funciona el ajuste de escala automático)

dataproc-autoscaling-cancellation-example

Notas:

  • El clúster tiene habilitado el ajuste de escala automático solo en función de las métricas de memoria YARN (configuración predeterminada).
  • T1-T9 representan los intervalos de inactividad cuando el escalador automático evalúa la cantidad de trabajadores (se simplificó el tiempo de los eventos)
  • Las barras apiladas representan los recuentos de elementos activos, retirados de servicio y retirados de servicio trabajadores YARN del clúster.
  • La cantidad de trabajadores recomendada del escalador automático (línea negra) se basa en las métricas de memoria YARN, el recuento de trabajadores activos de YARN y configuración de la política de ajuste de escala automático (consulta Cómo funciona el ajuste de escala automático).
  • El área de fondo roja indica el período durante el cual se está ejecutando la operación de reducción de escala.
  • El área de fondo amarilla indica el período durante el cual se cancela la operación de reducción de escala.
  • El área de fondo verde indica el período de la operación de escalamiento vertical.

Las siguientes operaciones ocurren en los siguientes momentos:

  • T1: El escalador automático inicia una operación de reducción de servicio de retiro de servicio ordenado reducir la escala verticalmente de aproximadamente la mitad de los trabajadores del clúster actuales.

  • T2: El escalador automático continúa supervisando las métricas del clúster. No cambia su recomendación de reducción de escala y continúa la operación de reducción de escala. Algunos trabajadores se retiraron de servicio y otros están dando de baja (Dataproc borrará a los trabajadores retirados).

  • T3: El escalador automático calcula que la cantidad de trabajadores puede reducirse aún más posiblemente debido a que hay memoria YARN adicional disponible. Sin embargo, como el de trabajadores activos más el cambio recomendado en la cantidad de trabajadores no es igual o mayor que el número de más activos retirar de servicio de los trabajadores, los criterios para la cancelación de la reducción de escala no se cumplen, y el escalador automático no cancela la operación de reducción de escala.

  • T4: YARN informa un aumento en la memoria pendiente. Sin embargo, el escalador automático no cambia su recomendación de cantidad de trabajadores. Al igual que en el T3, la reducción los criterios de cancelación siguen siendo insatisfactorios, y el escalador automático no cancela la operación de reducción de escala.

  • T5: Aumentos de memoria pendientes de YARN y cambio en la cantidad de trabajadores según lo recomendado por el escalador automático. Sin embargo, como la cantidad de trabajadores activos además del cambio recomendado en la cantidad de trabajadores es menor que el de trabajadores activos y de retiro de servicio, los criterios de cancelación insatisfecha y la operación de reducción de escala no se cancela.

  • T6: La memoria pendiente de YARN aumenta aún más. La cantidad de trabajadores activos más el el cambio en la cantidad de trabajadores que recomienda el escalador automático ahora es mayor que la cantidad de trabajadores activos y que retiraron el servicio. Los criterios de cancelación se cumplen y el escalador automático cancela la operación de reducción de escala.

  • T7: El escalador automático está esperando que se cancele la operación de reducción de escala para que se completó. El escalador automático no evalúa ni recomienda un cambio la cantidad de trabajadores durante este intervalo.

  • T8: Se completa la cancelación de la operación de reducción de escala. Cómo retirar de servicio a los trabajadores se agregan al clúster y se activan. El escalador automático detecta finalización de la cancelación de la operación de reducción de escala y espera período de evaluación (T9) para calcular la cantidad recomendada de trabajadores.

  • T9: No hay operaciones activas en el momento de T9. Según la política de escalador automático y YARN, el escalador automático recomienda una operación de escalamiento vertical.