Apresentando a Captura de Dados de Mudança Mais Fácil no Apache Spark™ Structured Streaming
Summary
- Rastreamento de estado simplificado: Novas funcionalidades de feed de alterações e instantâneos na API State Reader simplificam a depuração e análise de mudanças de estado no Apache Spark™ Structured Streaming.
- Desenvolvimento acelerado: Acompanhe as mudanças de estado em micro-lotes com consultas mínimas e reconstrua instantâneos de estado para diagnósticos precisos.
- Acessibilidade aprimorada: Simplifique insights para engenheiros e usuários de negócios, permitindo a integração perfeita com painéis e ferramentas de análise.
Este blog descreve as novas capacidades de feed de alterações e instantâneo na API State Reader do Apache Spark™ Structured Streaming. A API State Reader permite que os usuários acessem e analisem os dados de estado interno do Structured Streaming. Os leitores aprenderão como aproveitar as novas funcionalidades para depurar, solucionar problemas e analisar mudanças de estado de forma eficiente, tornando as cargas de trabalho de streaming mais fáceis de gerenciar em grande escala.
Uma maneira simples de lidar com mudanças de estado
Na paisagem em constante evolução da engenharia de dados, o Apache Spark Structured Streaming tornou-se uma pedra angular para o processamento de dados em tempo real em grande escala. No entanto, à medida que as cargas de trabalho em streaming aumentam em complexidade, também aumenta o desafio de desenvolver, depurar e solucionar problemas desses sistemas. Em março de 2024, a Databricks deu um passo significativo ao introduzir a API do Leitor de Estado, uma ferramenta poderosa projetada para enfrentar esses desafios diretamente, facilitando a consulta de dados e metadados do estado.
A Databricks introduziu melhorias significativas na API do Leitor de Estado, ampliando suas capacidades existentes para simplificar ainda mais o rastreamento e análise de estado. Essas melhorias aproveitam os dados do changelog da loja de estado para fornecer um feed de alterações com saída no formato padrão de Captura de Dados de Alteração (CDC). Outra nova capacidade ajuda a gerar uma visão do estado usando snapshots preferidos no diretório de checkpoint.
Neste post do blog, vamos nos aprofundar nessas novas funcionalidades, demonstrando como elas simplificam o rastreamento de mudanças de estado, a auditoria de transformação de dados e a reconstrução de instantâneos de estado. Os benefícios do feed de alterações aceleram o desenvolvimento, oferecendo um método mais simples para observar as alterações de valor de estado ao longo do tempo. Embora fosse possível com a versão anterior da API State Reader, era necessário mais código para iterar e inspecionar diferentes versões de estado. Agora, apenas algumas opções são suficientes para construir o feed de alterações.
Além do desenvolvimento e teste, essas melhorias facilitam a acessibilidade dos dados para os analistas. Por exemplo, uma consulta agendada agora poderia facilmente popular as visualizações do painel AI/BI, preenchendo a lacuna entre dados complexos de streaming e insights acionáveis.
Pré-requisitos:
O Feed de Mudanças da API de Leitura de Estado requer que a verificação de estado baseada em delta esteja ativada. Aqui, "delta" significa "diff", não Delta Lake. A implementação do armazenamento de estado com suporte a HDFS usa verificação de estado baseada em delta por padrão. Ao usar a implementação do armazenamento de estado baseada no RocksDB, uma configuração adicional do Spark é necessária para habilitar o checkpointing do changelog.
Revisão da API State Reader
O formato básico do armazenamento de estado tem as seguintes opções:
- batchId: o lote alvo para o qual queremos ler os valores do armazenamento de estado. Se não especificado, o último batchId é usado por padrão.
- operatorId: o operador alvo para o qual os valores do armazenamento de estado são procurados. O valor padrão é 0. Se existirem vários operadores com estado no fluxo, o estado dos outros operadores pode ser acessado usando esta opção.
- storeName: Isso representa o nome do armazenamento de estado alvo de onde ler. Esta opção é usada quando o operador com estado usa várias instâncias de armazenamento de estado. Ou o nome da loja ou o lado da junção devem ser especificados para uma junção de fluxo-fluxo, mas não ambos.
- joinSide: Esta opção é usada quando os usuários querem ler o estado de uma junção de fluxo-fluxo. Se esta opção for usada, o valor da opção esperado fornecido pelo usuário é "direito" ou "esquerdo".
O esquema do DataFrame de saída inclui as seguintes colunas:
- chave: a chave para um registro de operador com estado no checkpoint de estado.
- valor: o valor para um registro de operador com estado no checkpoint de estado.
- partition_id: a partição de checkpoint que contém o registro do operador com estado.
As opções básicas necessárias para o formato do armazenamento de estado são úteis para entender o que estava no armazenamento de estado para um determinado batchId.
Exemplo
O exemplo abaixo mostra como o formato de fonte de dados statestore Spark nos ajuda a consultar dados da loja de estados. Imagine que estamos investigando o valor de contagem do userId 8. Antes das novas opções da API do Leitor de Estado, que revisaremos na próxima seção, se quiséssemos observar a mudança dos dados do userId 8 ao longo de micro-lotes, teríamos que executar novamente a consulta abaixo para vários batchIds (veja a linha 3 da primeira célula abaixo).
Antes das novas opções, observar a mudança do valor de uma chave era tedioso e exigiria várias consultas. Vamos agora ver como as novas opções facilitam isso.
Apresentando novas opções
As seguintes novas opções fazem parte das novas capacidades de feed de alterações da API do Leitor de Estado:
Opção | Comentário | |
---|---|---|
Feed de alterações | ||
readChangeFeed | Quando "true" habilita a saída do feed de mudança. | |
changeStartBatchId | Obrigatório O batchId no qual o feed de alterações deve começar. | |
changeEndBatchId | Opcional. O último lote a ser usado no feed de altera ções. | |
Snapshot | ||
snapshotPartitionId | Necessário ao usar snapshotStartBatchId. Se especificado, apenas esta partição específica será lida. | |
snapshotStartBatchId | Necessário ao usar snapshotPartitionId. | |
snapshotEndBatchId ou batchId | Opcional. O último lote a ser usado na geração dos valores de snapshot. |
Tenha cuidado com os valores usados para as opções de batchId. Por padrão, cem checkpoints históricos e arquivos de estado relacionados são mantidos. A propriedade spark.sql.streaming.minBatchesToRetain
pode ser usado para substituir o valor padrão. Se você tentar acessar os dados de estado de um lote que envelheceu e não existe mais, você verá uma mensagem de erro como esta: [STDS_OFFSET_LOG_UNAVAILABLE] O log de deslocamento para 92 não existe, localização do checkpoint: /Volumes/mycheckpoint-path.
Exemplo de feed de alterações
No exemplo abaixo, usamos o feed de alterações para observar mudanças para a chave userId
valor 8. O campo change_type
pode ser útil durante o desenvolvimento, depuração ou ao investigar um problema de dados de produção. Os dados do feed de alterações permitem que você veja rapidamente como o valor de uma chave mudou ao longo de várias micro-batches. No exemplo abaixo, onde a chave de estado inclui uma janela, você pode ver como o partition_id também mudou.
Exemplo de snapshot
A corrupção do armazenamento de estado é improvável devido à tolerância a falhas do Apache Spark, onde micro-lotes são planejados (os deslocamentos são escritos no local de checkpoint) e os commits são concluídos (e sincronizados com os dados de estado no local de checkpoint). No entanto, erros humanos ou bugs são sempre possíveis. O recurso de captura de tela da API de Leitura de Estado pode ser uma ferramenta útil para reconstruir o estado a partir dos dados do changelog, ignorando os arquivos de captura de tela subsequentes. O recurso requer um batchId inicial (via a opção snapshotStartBatchId) para o qual existe um arquivo de captura de tela. Começando com o lote snapshotStartBatchId batchId, o recurso de snapshot da API do Leitor de Estado construirá uma imagem do estado com base no batchId inicial e terminando no batchId especificado com a opção snapshotEndBatchId.
Se estiver usando o armazenamento de estado RocksDB, a estrutura de arquivos subjacente se parece com isto:
Para construir uma imagem do estado a partir do lote 1800, usando o snapshot inicial do estado fotografado 1740.zip, você usaria um código que se parece com isso:
Você pode notar que na imagem listando os arquivos de checkpoint, os dados capturados estão em 1740.zip, enquanto ao usar a API de Leitura de Estado, usamos um snapshotStartBatchId de 1741. A razão é que a convenção de nomenclatura de arquivos usa um índice base 1, enquanto os números de batchId no Spark UI começam em 0.
Conclusão
As novas funcionalidades da API do Leitor de Estado abrem novas oportunidades para auditoria, exploração e visualização de alterações de estado. As novas funcionalidades ajudarão os desenvolvedores a serem mais eficientes porque, caso contrário, são necessárias consultas separadas para extrair os valores do estado em uma variedade de lotes. No entanto, os potenciais beneficiários da nova funcionalidade vão além do desenvolvimento e do pessoal de suporte. Os stakeholders de negócios também podem estar interessados nas percepções possíveis ao analisar os dados do feed de alterações. Em qualquer caso, a construção de consultas e painéis para exibir os dados agora é mais fácil, graças às melhorias da API do Leitor de Estado.
Em conclusão, o feed de alterações permite o rastreamento detalhado das alterações de estado em micro-lotes, oferecendo insights inestimáveis durante as fases de desenvolvimento e depuração. O recurso de snapshot é uma ferramenta de diagnóstico útil, permitindo que os engenheiros reconstruam o estado a partir de arquivos de log de alterações para construir uma visão completa do estado em um ponto específico (batchId).
Você pode ler mais sobre a API do Leitor de Estado aqui, ou ver uma demonstração aqui.
(This blog post has been translated using AI-powered tools) Original Post