Ir para o conteúdo principal

Introdução 

 

Nota: Este Blog foi escrito pela CERC, cliente Databricks.

No dia-a-dia, utilizamos diversos recursos tecnológicos, onde é um grande desafio integrá-los e manter a catalogação sempre unificada. Assim, iremos falar sobre como conectar o Unity Catalog ao Data Catalog.

A situação específica que originou essa necessidade é: a maior parte de nossos conjuntos de data assets do ambiente informacional estarem no recurso Databricks e atualmente termos o catálogo de dados corporativo utilizando o GCP-Data Catalog, que para as tabelas dentro da GCP, já as mapeia imediatamente.

Com isso, realizamos a primeira conexão entre estes recursos com um repositório de códigos que captura metadados definidos a partir do Hive Metastore do Databricks, integrando no conjunto GCP.

Com esse primeiro desafio de conexão superado, a companhia entendeu por melhor estratégia utilizar o produto Databricks-Unity Catalog, onde tivemos que repensar nossa integração, de forma que ela fosse capaz de manter os ativos de dados migrados para o novo produto e também manter os data assets existentes com origem no Hive Metastore. 

Entretanto nos deparamos com a falta de solução para integrar o Databricks Unity Catalog e o Google Data Catalog, assim surgindo a solução que iremos apresentar aqui.

O Unity Catalog possui a capacidade de utilizar APIs para acesso aos metadados do Databricks, sendo que na versão com Hive Metastore esta técnica não era possível.  

O GCP- Data Catalog já fornecia o uso de APIs, então com isso, foi possível criar uma conexão funcional entre os catálogos de forma simplificada em comparação às propostas de conexões anteriores.

Com isso podemos chegar em uma solução mais fácil de ser implementada com base nas documentações de cada recurso com algumas bibliotecas para as conexões com o GCP-Data Catalog. O diagrama abaixo mostra o direcionamento de nossa estratégia onde,  os metadados e catalogação se concentram no GCP-Data Catalog, adicionando os metadados do Databricks extraídos do Unity Catalog através do conector.

Neste blog iremos explorar como:

  • Criar os requests dos metadados do Databricks;
  • Construir o processo de conexão entre Unit Catalog e Data Catalog;
  • Postar as alterações com a API do Data Catalog.

Obs.: Deve ser executado em um máquina de Databricks Runtime 10.4, ou até que a google faça uma atualização em sua biblioteca datacatalog_connectors.commons

 

Criar os requests dos metadados do Databricks

 

Com a ajuda do time da Databricks e as documentações do Unity Catalog, foi possível criar a classe responsável por extrair as informações da API do Unity Catalog, e organizá-los em uma lista de assets.

Estes assets são compostos por alguns elementos principais como nome da tabela, seu schema, nome do catálogo, colunas, os tipos das colunas, data de criação e data de atualização (completar infos e colar exemplos) 

 A classe, após fazer os requests no Unity Catalog, organiza as informações na disposição citada anteriormente (list(dict{asset}).

class RetriveMetadataFromUC:
   """The class is responsible for retrieving Databricks Unity Catalog metadata such as catalog name, table schema, table name, creation and update timestamps and its columns, and other components."""
   def __init__(self):
       self.catalogs = [
           catalog["name"]
           for catalog in query_uc_api(http, "unity-catalog/catalogs", {})["catalogs"]
       ]  
       self.all_schemas=[]
       self.all_tables=[]
   def _retrieve_schemas(self):
       """ """
       logging.info("Creating JSON from the UC Schemas")
       for catalog in self.catalogs:
           try:
               schemas = [
                   (catalog, schema["name"])
                   for schema in query_uc_api(
                       http, "unity-catalog/schemas", {"catalog_name": catalog}
                   )["schemas"]
               ]
               self.all_schemas.extend(schemas)
           except Exception as error:
               logging.error(f"Error in _retrieve_schema: {error}")
               continue
       logging.info("Schema JSONs Created!")
       self.schemasDF = spark.read.json(sc.parallelize(schemas))
       logging.debug(f"""Dataframe schemas:
                     {self.schemasDF}""")
       return self.all_schemas
   def get_tables(self):
       all_schemas=self._retrieve_schemas()
       logging.info("Creating list of table infos")
       for catalog,schema in all_schemas:
           tables_query = query_uc_api(
               http, "unity-catalog/tables", {"catalog_name": catalog, "schema_name": schema}
           )
           if "tables" in tables_query:
               for table in tables_query["tables"]:
                   self.all_tables.append(table)
       logging.info("Lista de tabelas criada!")
       return self.all_tables

 

Construir o processo de conexão entre Unit Catalog e Google Data Catalog

 

O processo da conexão utiliza principalmente a biblioteca disponibilizada pela google (google-datacatalog-connectors-common) onde são aplicadas abstrações das APIs do Data Catalog com este objetivo. Assim, estando em mente o resultado da classe RetriveMetadataFromUC, é criada uma nova classe que prepara os objetos Entry para serem exportados posteriormente para o sistema GCP, ou seja, para cada asset (tabela do Databrick) é criado um objeto Entry. Podemos observar também que a classe PrepareDatabricksMetadata está herdando as propriedades da classe BaseEntryFactory, que advém da biblioteca google mencionada anteriormente, assim podemos utilizar alguns métodos que facilitam formatações.

class PrepareDatabricksMetadata(base_entry_factory.BaseEntryFactory):
   __ENTRY_ID_INVALID_CHARS_REGEX_PATTERN = r'[^a-zA-Z0-9_]+'
   def __init__(self,project_id,location_id,entrygroup_id):
       self.__project_id=project_id
       self.__location_id=location_id
       self.__entry_group_id=entrygroup_id
    
       self.catalog_client=datacatalog_v1.DataCatalogClient()


   def get_entry_by_name(self, name):
       """Retrieves Data Catalog Entry.
       :param name: The Entry name.
       :return: An Entry object if it exists.
       """
       return self.catalog_client.get_entry(name=name)


   def _create_entries(self,asset_info):
       entry_id= self.__make_entry_id_for_table(asset_info["schema_name"],
           asset_info["name"])
       entry=datacatalog_v1.Entry()
       entry.user_specified_type = asset_info["table_type"]
       entry.user_specified_system = 'UNITY_CATALOG'
       entry.display_name = self._format_display_name(asset_info["name"])
       entry.name = datacatalog_v1.DataCatalogClient.entry_path(
           self.__project_id, self.__location_id, self.__entry_group_id,
           entry_id)   
       data_url=f"https://{workspace_url}/explore/data/{asset_info['catalog_name']}/{asset_info['schema_name']}/{asset_info['name']}"
       entry.linked_resource = \
           self._format_linked_resource(data_url)
       created_timestamp = timestamp_pb2.Timestamp()
       created_timestamp.FromMilliseconds(asset_info['created_at'])
       entry.source_system_timestamps.create_time = created_timestamp
       updated_timestamp = timestamp_pb2.Timestamp()
       updated_timestamp.FromMilliseconds(asset_info['updated_at'])
       entry.source_system_timestamps.update_time = updated_timestamp


       columns = []                                                               
       schema = datacatalog_v1.types.Schema()
       for column in asset_info['columns']:
           columns.append(                                                            
               datacatalog_v1.types.ColumnSchema(                                       
                   column=format_entry_column_name(
                       column["name"]),
                   type=format_entry_column_type(
                       column["type_name"])
               )
           )
       schema.columns = columns
       entry.schema = schema
       try:
           persisted_entry = self.get_entry_by_name(
               entry.name
           )


           entry = self.fill_column_description(
               entry,
               persisted_entry
           )
       except:
           pass
       return entry_id, entry
   def __make_entry_id_for_table(self, schema_name, table):
       # We normalize and hash first the database_name.
       normalized_database_name = self._format_id_with_hashing(
           schema_name.lower(),
           regex_pattern=self.__ENTRY_ID_INVALID_CHARS_REGEX_PATTERN)


       # Next we do the same for the table name.
       normalized_table_name = self._format_id_with_hashing(
           table.lower(),
           regex_pattern=self.__ENTRY_ID_INVALID_CHARS_REGEX_PATTERN)


       entry_id = '{}__{}'.format(normalized_database_name,
                                  normalized_table_name)
       return entry_id

Aqui é importante mencionar um detalhe: fazer uma atualização de uma Entry já catalogada com informações vazias pelo conector, irá remover as informações que estão no Data Catalog, assim a função da variável persisted_entry é justamente escrever a entry com as informações que estão no catálogo.

Além disso, uma característica da entry que está sendo criada é o link que vai ser redirecionado ao Databricks quando acessado no Data Catalog, assim, quando criada a entry terá todo seu conteúdo de metadados exportado ao Data Catalog.

Dado que a classe PrepareDatabricksMetadata faz as preparações a partir de cada asset, foi necessário criar uma classe que itere sob a lista de assets (resultado final da API do UC):

class AssembleEntries:
   def __init__(self, project_id, location_id,
                entry_group_id,
               db_tables_list=None
                ):
       self.db_tables_list=db_tables_list
       self.__datacatalog_preparing = \
           PrepareDatabricksMetadata(
               project_id,location_id,entry_group_id)
   def make_entry_for_tables(self):
       entries = []
       for table_dict in self.db_tables_list:
           entry_id, entry = self.\
               __datacatalog_preparing._create_entries(table_dict)


           entries.append(prepare.AssembledEntryData(entry_id, entry))
       return entries

Antes da classe final, que irá mandar as entries criadas para o Data Catalog, é preciso criar um processo para manutenção das tabelas deletadas no Databricks, ou seja, quando haver ainda uma entrie no entryGroups designado para o Databricks, mas ela já não mais existir como tabela no Databricks, será deletada, assim primeiro será buscado as entries, depois será criada a comparação e criará a classe de deleção.

class GetTablesFromGCP():
   def __init__(self,project,location,entry_group):
       self.client=datacatalog_v1.DataCatalogClient()
       self.gcp_tables_list=[]
       self.project=project
       self.location=location
       self.entry_group=entry_group
      
   def get_list_of_assets_from_datacatalog(self):
       try:
           uc_entrie_group=f"projects/{self.project}/locations/{self.location}/entryGroups/{self.entry_group}"
           request = datacatalog_v1.ListEntriesRequest(
           parent=uc_entrie_group,
           )
           page_result = self.client.list_entries(request=request)
           for response in page_result:
               if response.user_specified_system =='UNITY_CATALOG':
                   self.gcp_tables_list.append(response.name)
           return self.gcp_tables_list
       except Exception as e:
           logging.error(e)

Assim a lista deverá ser iterada sob o processo de deleção:

class DeleteEntriesForDataCatalog(base_entry_factory.BaseEntryFactory):
   __ENTRY_ID_INVALID_CHARS_REGEX_PATTERN = r'[^a-zA-Z0-9_]+'
   def __init__(self,project,location,entry_group):
       self.delete_list=[]
       self.project=project
       self.location=location
       self.entry_group=entry_group
       self.gcp_assets=GetTablesFromGCP(self.project,self.location,self.entry_group).get_list_of_assets_from_datacatalog()
       self.tables_in_unity_catalog=RetriveMetadataFromUC().get_tables()
       self.complete_tables_in_db=[]
       self.__datacatalog=datacatalog_v1.DataCatalogClient()


  
   def _normalize_tables(self):
       for asset in self.tables_in_unity_catalog:
           full_name_asset=self.__make_entry_id_for_table(asset['schema_name'],asset['name'])
           self.complete_tables_in_db.append(full_name_asset)
       return self.complete_tables_in_db


   def get_difference_between_gcp_db(self):
       for gcp_asset in self.gcp_assets:
           if (gcp_asset.split("/"))[7] not in self._normalize_tables():
               self.delete_list.append(gcp_asset)


   def delete_assets(self):
       if not self.delete_list:
           logging.info("Nothing to Delete")
       else:


           for asset in self.delete_list:
               logging.info(f"Deleting {asset}")
               self.delete_entry(asset)



   def delete_entry(self, name):
       """Deletes a Data Catalog Entry.


       :param name: The Entry name.
       """
       try:
           self.__datacatalog.delete_entry(name=name)
       except Exception as e:
           logging.info(
               'An exception ocurred while attempting to'
               ' delete Entry: %s', name)
           logging.debug(str(e))


   def __make_entry_id_for_table(self, schema_name, table):
       # We normalize and hash first the database_name.
       normalized_database_name = self._format_id_with_hashing(
           schema_name.lower(),
           regex_pattern=self.__ENTRY_ID_INVALID_CHARS_REGEX_PATTERN)


       # Next we do the same for the table name.
       normalized_table_name = self._format_id_with_hashing(
           table.lower(),
           regex_pattern=self.__ENTRY_ID_INVALID_CHARS_REGEX_PATTERN)


       entry_id = '{}__{}'.format(normalized_database_name,
                                  normalized_table_name)
       return entry_id

 

Postar as alterações com a API do Data Catalog

 

Finalmente é possível a criação da classe final, que irá condensar as classes anteriores em uma utilização única, onde seus inputs serão o projeto, local e entryGroup do GCP em que os metadados do Databricks serão enviados:

class ExportDatabricksMetadataToDataCatalog():


   def __init__(self,
                project_id,
                location_id,
                entry_group_id="uc",
                db_tables_list=None):
       self.__entry_group_id = entry_group_id
       self.__project_id = project_id
       self.__location_id = location_id
       self.__db_tables_list=db_tables_list




   def run(self):
       logging.info('\n==============Start uc-to-datacatalog============')
       logging.info('\n\n==============Exctracting UC API===============')


       logging.info('\n{}'.format(len(self.__db_tables_list)) +
               ' tables ready to be ingested...')
      
       logging.info('\n\n==============Prepare metadata===============')
       logging.info('\nPreparing the metadata...')


       factory = AssembleEntries(self.__project_id, self.__location_id,
                self.__entry_group_id,self.__db_tables_list)
       prepared_entries = factory.make_entry_for_tables()


       logging.info("\n =============Delete metadata===============")
       delete_factory=DeleteEntriesForDataCatalog(self.__project_id, self.__location_id,self.__entry_group_id)
       if not delete_factory.gcp_assets:
           logging.info("Nothing in Data Catalog yet...")
       else:
           delete_factory.get_difference_between_gcp_db()
           delete_factory.delete_assets()
       logging.info('\n==============Ingest metadata===============')
       self.__ingest_created_or_updated(prepared_entries)


   def __ingest_created_or_updated(self, prepared_entries):
       ingestor = datacatalog_metadata_ingestor.DataCatalogMetadataIngestor(
           self.__project_id, self.__location_id, self.__entry_group_id)
       ingestor.ingest_metadata(prepared_entries)

Com isto para a execução do notebook e manter um catálogo atualizado, é preciso então executar a função run, com os inputs da classe preenchido:

__PROJECT_ID = f"{GCP_PROJECT}"
__LOCATION = f"{GCP_LOCATION}"
db_to_datacatalog=ExportDatabricksMetadataToDataCatalog(
               project_id=__PROJECT_ID,
               location_id=__LOCATION,
               entry_group_id="uc",
               db_tables_list=RetriveMetadataFromUC().get_tables())
db_to_datacatalog.run()

Onde __PROJECT_ID é o projeto e o __LOCATION é o local do GCP que serão armazenados os metadados do Unity Catalog, sempre no caminho: projects/__PROJECT_ID/locations/__LOCATION/entryGroups/uc/entries/entry_id dentro do Data Catalog.

Assim, um exemplo de uma entry criada no Data Catalog seria:

Resumo

 

Nós demonstramos que hoje é simples criar uma conexão entre Unity Catalog e Data Catalog, a ideia principal é que se construa diretamente no sistema GCP uma Entry (Metadado da tabela) customizada do catálogo partindo do Databricks. Além disso, toda a construção do conector foi feita de forma modular, partindo de cada classe que o compõem e com isso permite em si customizações, tanto da arquitetura para que ele foi construído quanto para como ele modifica cada metadado. Para referência o código .dbc completo encontra-se nesse link.

Experimente o Databricks gratuitamente
Ver tudo Tutoriais posts