Como baixar io.confluent.connect.jdbc.jdbcsourceconnector
Se quiser importar dados de qualquer banco de dados relacional com um driver JDBC para um tópico Apache Kafka, você pode usar o conector Kafka Connect JDBC Source. Esse conector pode suportar uma ampla variedade de bancos de dados e modos de consulta e pode ser executado no modo autônomo ou distribuído. Neste artigo, mostraremos como baixar e configurar o conector e como usá-lo para transmitir dados de um banco de dados para o Kafka.
O que é io.confluent.connect.jdbc.jdbcsourceconnector e o que ele faz?
O io.confluent.connect.jdbc.jdbcsourceconnector é uma classe Java que implementa a interface SourceConnector do Kafka Connect. Faz parte da plataforma Confluent, que é uma distribuição do Apache Kafka que fornece recursos e integrações adicionais. O conector permite executar periodicamente uma consulta SQL em um banco de dados e criar um registro de saída para cada linha no conjunto de resultados. Os registros de saída são então enviados para um tópico Kafka, onde podem ser consumidos por outros aplicativos ou conectores.
download io.confluent.connect.jdbc.jdbcsourceconnector
Quais são os benefícios e casos de uso de usá-lo?
O uso do conector de origem JDBC tem vários benefícios e casos de uso, como:
Simplifica a integração de dados entre bancos de dados e Kafka, pois você não precisa escrever nenhum código personalizado ou usar nenhuma ferramenta intermediária.
Ele suporta modos de consulta incremental, o que significa que ele pode rastrear quais linhas foram processadas e quais linhas são novas ou atualizadas, e apenas buscar os dados relevantes do banco de dados.
Ele permite que você transmita dados de várias tabelas ou esquemas, usando listas brancas ou listas negras ou consultas personalizadas.
Ele permite que você aproveite a escalabilidade e a tolerância a falhas do Kafka Connect, pois pode executar o conector no modo autônomo ou distribuído, dependendo de suas necessidades.
Ele permite que você use o Kafka como um hub central para pipelines de dados, análise de streaming, integração de dados e aplicativos de missão crítica.
Pré-requisitos
Antes de fazer download e configurar o conector de origem JDBC, você precisa ter os seguintes pré-requisitos:
Um banco de dados relacional com um driver JDBC. Para este artigo, usaremos o MySQL como exemplo, mas você pode usar qualquer banco de dados que tenha um driver compatível.
Um cluster Kafka com pelo menos um agente e um nó ZooKeeper. Você pode usar a plataforma Confluent ou o Apache Kafka para configurar seu cluster.
Kafka Connect, que está incluído na plataforma Confluent ou Apache Kafka. Você precisa ter pelo menos um nó de trabalho do Kafka Connect em execução.
O driver JDBC para seu banco de dados. Você precisa baixar o arquivo JAR do driver e colocá-lo em um diretório acessível pelo Kafka Connect.
Como instalar o Kafka Connect e o driver JDBC?
Para instalar o Kafka Connect, você pode seguir as instruções de , dependendo de qual distribuição você está usando. Você pode executar o Kafka Connect no modo autônomo (para desenvolvimento e teste) ou no modo distribuído (para produção).
Para instalar o driver JDBC para seu banco de dados, você pode seguir estas etapas:
Faça download do arquivo JAR do driver no site do fornecedor do banco de dados. Por exemplo, para MySQL, você pode baixá-lo em .
Crie um diretório para o arquivo JAR do driver em seu nó do trabalhador do Kafka Connect. Por exemplo, /usr/share/java/kafka-connect-jdbc.
Copie o arquivo JAR do driver para o diretório que você criou.
Etapas para baixar e configurar o conector
Agora que você tem os pré-requisitos prontos, pode fazer o download e configurar o conector de origem JDBC. Você pode baixar o conector do Confluent Hub ou do repositório Maven, dependendo de sua preferência.
Como baixar o conector do repositório Confluent Hub ou Maven?
Para baixar o conector do Confluent Hub, você pode seguir estas etapas:
Vá para e procure por "Conector de Origem JDBC".
Selecione a versão mais recente do conector e clique em "Download".
Extraia o arquivo ZIP para um diretório de sua escolha. Por exemplo, /usr/share/confluent-hub-components.
Adicione o diretório à propriedade plugin.path em seu arquivo de configuração de trabalho do Kafka Connect. Por exemplo, plugin.path=/usr/share/confluent-hub-components.
Reinicie o nó do trabalhador do Kafka Connect.
Para baixar o conector do repositório Maven, você pode seguir estas etapas:
Vá para e procure por "io.confluent.connect.jdbc.jdbcsourceconnector".
Selecione a versão mais recente do conector e clique em "Baixar JAR".
Crie um diretório para o arquivo JAR do conector em seu nó do trabalhador Kafka Connect. Por exemplo, /usr/share/java/kafka-connect-jdbc.
Copie o arquivo JAR do conector para o diretório que você criou.
Adicione o diretório à propriedade plugin.path em seu arquivo de configuração de trabalho do Kafka Connect. Por exemplo, plugin.path=/usr/share/java/kafka-connect-jdbc.
Reinicie o nó do trabalhador do Kafka Connect.
Como criar um arquivo de configuração para o conector?
Para criar um arquivo de configuração para o conector, você precisa especificar algumas propriedades obrigatórias e opcionais. As propriedades obrigatórias são:
nome: um nome exclusivo para a instância do conector.
conector.classe: O nome da classe Java para o conector. Nesse caso, é io.confluent.connect.jdbc.JdbcSourceConnector.
tarefas.max: o número máximo de tarefas que o conector pode executar em paralelo.
conexão.url: a URL de conexão JDBC para seu banco de dados. Você precisa incluir o nome do banco de dados, host, porta e quaisquer outros parâmetros exigidos pelo seu driver.
modo: o modo de consulta para o conector. Você pode escolher entre os modos em massa, de incremento, carimbo de data/hora ou carimbo de data/hora+incremento, dependendo de como deseja buscar dados do seu banco de dados.
tópico.prefixo: o prefixo a ser usado para os nomes dos tópicos Kafka para onde os dados serão enviados.O conector anexará o nome da tabela ou da consulta ao prefixo para formar o nome completo do tópico.
As propriedades opcionais são:
table.whitelist: Uma lista separada por vírgulas de tabelas a serem incluídas na cópia. Se especificado, apenas essas tabelas serão processadas.
tabela.lista negra: uma lista separada por vírgulas de tabelas a serem excluídas da cópia. Se especificado, essas tabelas serão ignoradas.
consulta: uma consulta SQL personalizada para executar e copiar dados. Se especificada, esta consulta substituirá qualquer configuração table.whitelist ou table.blacklist.
incrementando.coluna.nome: o nome de uma coluna em seu banco de dados que aumenta monotonicamente e é exclusivo para cada linha. Esta coluna será usada para rastrear quais linhas foram processadas e quais linhas são novas ou atualizadas. Esta propriedade é necessária se você usar os modos de incremento ou carimbo de data/hora+incremento.
timestamp.column.name: o nome de uma coluna em seu banco de dados que contém um valor de registro de data e hora que indica quando cada linha foi modificada pela última vez. Esta coluna será usada para rastrear quais linhas foram processadas e quais linhas são novas ou atualizadas. Essa propriedade é necessária se você usar os modos carimbo de data/hora ou carimbo de data/hora+incremento.
validar.não.nulo: um valor booleano que indica se o conector deve validar se todas as colunas de chave primária não são nulas em cada linha. Se definido como verdadeiro, o conector falhará se qualquer coluna de chave primária for nula. Se definido como falso, o conector irá ignorar valores nulos e usá-los como parte da chave. O valor padrão é verdadeiro.
lote.max.linhas: o número máximo de linhas a serem incluídas em cada lote ao pesquisar novos dados. Essa configuração pode ajudar a reduzir o uso de memória e o tráfego de rede. O valor padrão é 100.
enquete.intervalo.ms: a frequência em milissegundos para pesquisar dados novos ou atualizados. Essa configuração pode afetar a latência e a taxa de transferência do conector. O valor padrão é 5000.
tabela.tipos: uma lista separada por vírgulas de tipos de tabela para incluir na cópia.Os tipos suportados são TABLE, VIEW, SYSTEM TABLE, GLOBAL TEMPORARY, LOCAL TEMPORARY, ALIAS e SYNONYM. Por padrão, apenas TABLE é incluído.
numeric.mapping: Um mapeamento de tipos numéricos do banco de dados para tipos do Kafka Connect. Os mapeamentos suportados são best_fit (o padrão), precisão_apenas e nenhum.
dialeto.nome: o nome do dialeto do banco de dados que o conector deve usar. Por padrão, o conector detectará automaticamente o dialeto com base na URL de conexão JDBC. Você também pode especificar um nome de classe de dialeto customizado se tiver implementado um.
citação.sql.identificadores: uma configuração que controla como o conector cita identificadores, como nomes de tabelas e colunas em instruções SQL. Os valores suportados são sempre (o padrão), nunca ou automático (somente quando necessário).
timestamp.delay.interval.ms: o atraso em milissegundos para permitir entre o horário atual e o horário mais recente que é usado para consultar tabelas baseadas em carimbo de data/hora. Essa configuração pode ajudar a evitar dados ausentes que ainda não foram confirmados no banco de dados. O valor padrão é 0.
db.timezone: o nome do fuso horário Java que o conector deve usar para interpretar os valores de data e hora do banco de dados. Por padrão, o conector usará UTC. Você também pode especificar um ID de fuso horário personalizado, se tiver um.
Aqui está um exemplo de um arquivo de configuração para o conector de origem JDBC:
"name": "jdbc-source-connector", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://localhost:3306/testdb?user=root&password=root", "mode": "incrementing", "incrementing.column.name": "id", "topic .prefix": "teste-", "table.whitelist": "clientes, pedidos"
Como especificar as propriedades de conexão, modos de consulta e nomes de tópicos?
Para especificar as propriedades de conexão, modos de consulta e nomes de tópicos para o conector, você precisa usar os seguintes parâmetros em seu arquivo de configuração:
Parâmetro
Descrição
Exemplo
conexão.url
A URL de conexão JDBC para seu banco de dados. Você precisa incluir o nome do banco de dados, host, porta e quaisquer outros parâmetros exigidos pelo seu driver.
"connection.url": "jdbc:mysql://localhost:3306/testdb?user=root&password=root"
modo
O modo de consulta para o conector. Você pode escolher entre os modos em massa, de incremento, carimbo de data/hora ou carimbo de data/hora+incremento, dependendo de como deseja buscar dados do seu banco de dados.
"modo": "incrementando"
tópico.prefixo
O prefixo a ser usado para os nomes dos tópicos Kafka para onde os dados serão enviados. O conector anexará o nome da tabela ou da consulta ao prefixo para formar o nome completo do tópico.
"topic.prefix": "teste-"
O parâmetro connection.url é obrigatório e depende do seu tipo de banco de dados e driver. Você pode encontrar alguns exemplos de URLs de conexão para diferentes bancos de dados .
O parâmetro mode também é obrigatório e determina como o conector consulta os dados do seu banco de dados. Existem quatro modos possíveis:
Modo em massa: Nesse modo, o conector executará uma verificação completa de cada tabela e enviará todas as linhas para Kafka. Este modo é adequado para tabelas que não possuem nenhuma coluna de chave primária ou carimbo de data/hora, ou para tabelas que não mudam com frequência.
Modo de incremento: Nesse modo, o conector usará uma coluna de aumento monotônico (como um ID de incremento automático) para rastrear quais linhas foram processadas e quais linhas são novas ou atualizadas. Este modo é adequado para tabelas que possuem uma coluna de chave primária que está sempre aumentando.
Modo de carimbo de data/hora: Nesse modo, o conector usará uma coluna de carimbo de data/hora (como a data da última modificação) para rastrear quais linhas foram processadas e quais linhas são novas ou atualizadas. Este modo é adequado para tabelas que possuem uma coluna timestamp que é atualizada sempre que uma linha é inserida ou modificada.
Carimbo de data/hora+modo de incremento: Nesse modo, o conector usará uma combinação de uma coluna de carimbo de data/hora e uma coluna de incremento para rastrear quais linhas foram processadas e quais linhas são novas ou atualizadas. Este modo é adequado para tabelas que possuem uma coluna de carimbo de data/hora e uma coluna de chave primária, e onde a coluna de carimbo de data/hora pode não ser atualizada para algumas linhas.
O parâmetro topic.prefix também é obrigatório e define o prefixo comum para todos os tópicos Kafka que o conector criará. O conector anexará o nome da tabela ou o nome da consulta ao prefixo para formar o nome completo do tópico. Por exemplo, se o prefixo for "test-" e o nome da tabela for "customers", o nome do tópico será "test-customers".
Como executar o conector no modo autônomo ou distribuído?
Para executar o conector no modo autônomo ou distribuído, você precisa usar diferentes comandos e arquivos de configuração. No modo autônomo, você pode executar o conector em um único nó de trabalho do Kafka Connect, usando um único arquivo de configuração que contém as propriedades do trabalho e do conector. No modo distribuído, você pode executar o conector em vários nós de trabalho do Kafka Connect, usando arquivos de configuração separados para o trabalho e as propriedades do conector. Você também precisa usar uma API REST ou uma ferramenta de linha de comando para enviar a configuração do conector ao cluster Kafka Connect.
Para executar o conector no modo autônomo, você pode seguir estas etapas:
Crie um arquivo de configuração que contenha as propriedades do operador e do conector. Por exemplo, jdbc-source-connector-standalone.properties.
Execute o seguinte comando no diretório de instalação do Kafka Connect:
bin/connect-standalone.sh config/connect-standalone.properties jdbc-source-connector-standalone.properties
Verifique os logs e os tópicos do Kafka para verificar se o conector está em execução e enviando dados.
Para executar o conector no modo distribuído, você pode seguir estas etapas:
Crie um arquivo de configuração que contenha apenas as propriedades do conector.Por exemplo, jdbc-source-connector-distributed.json.
Execute o seguinte comando de seu diretório de instalação do Kafka Connect para iniciar um nó de trabalho do Kafka Connect no modo distribuído:
bin/connect-distributed.sh config/connect-distributed.properties
Execute o seguinte comando de outro terminal para enviar a configuração do conector para o cluster Kafka Connect usando a API REST:
curl -X POST -H "Tipo de conteúdo: aplicativo/json" --data @jdbc-source-connector-distributed.json
Verifique os logs e os tópicos do Kafka para verificar se o conector está em execução e enviando dados.
Conclusão
Neste artigo, mostramos como baixar e configurar o conector de origem JDBC e como usá-lo para transmitir dados de um banco de dados para o Kafka. Também explicamos alguns dos benefícios e casos de uso desse conector, bem como alguns dos parâmetros e modos que você pode usar para personalizar seu comportamento. Esperamos que este artigo tenha ajudado você a entender como usar esse conector de maneira eficaz e eficiente.
Aqui estão algumas dicas e práticas recomendadas para usar o conector de origem JDBC:
Escolha um modo de consulta apropriado para seu banco de dados e dados. Por exemplo, se seu banco de dados oferece suporte a gatilhos ou captura de dados alterados (CDC), você pode usar os modos timestamp ou timestamp+incrementing para buscar apenas dados novos ou atualizados. Se seu banco de dados não oferece suporte a esses recursos, você pode usar os modos em massa ou de incremento para buscar todos os dados periodicamente.
Use listas brancas ou listas negras para filtrar tabelas ou esquemas indesejados. Por exemplo, se você deseja apenas copiar dados de determinadas tabelas, pode usar table.whitelist para especificá-los. Se você deseja excluir algumas tabelas, pode usar table.blacklist para especificá-las.
Use consultas personalizadas para transformar ou enriquecer dados antes de enviá-los ao Kafka.Por exemplo, se você deseja unir dados de várias tabelas, adicionar algumas colunas computadas ou filtrar algumas linhas, pode usar query para especificar uma consulta SQL personalizada que executa essas operações.
Ajuste o tamanho do lote e o intervalo de pesquisa de acordo com o volume de dados e os requisitos de latência. Por exemplo, se você tiver uma grande quantidade de dados ou requisitos de baixa latência, poderá usar um tamanho de lote menor e um intervalo de pesquisa mais curto para buscar dados com mais frequência. Se você tiver uma pequena quantidade de dados ou requisitos de alta latência, poderá usar um tamanho de lote maior e um intervalo de pesquisa mais longo para buscar dados com menos frequência.
Monitore o desempenho e o status do seu conector usando métricas e logs. Por exemplo, você pode usar métricas JMX ou Confluent Control Center para monitorar métricas como throughput, taxa de erro, atraso e deslocamento. Você também pode usar a API REST do Kafka Connect ou a CLI do Confluent para verificar o status, a configuração e as tarefas do seu conector. Você também pode usar os logs para solucionar quaisquer problemas ou erros que possam ocorrer.
Aqui estão alguns links para ler e aprender mais sobre o conector de origem JDBC:
perguntas frequentes
Aqui estão algumas perguntas frequentes sobre o conector de origem JDBC:
Como posso atualizar ou excluir dados do Kafka usando o conector de origem JDBC?
O conector de origem JDBC não oferece suporte à atualização ou exclusão de dados do Kafka. Ele suporta apenas a inserção de dados novos ou atualizados do banco de dados para o Kafka. Se você deseja atualizar ou excluir dados do Kafka, precisa usar outro conector ou ferramenta que suporte essas operações, como Kafka Connect SMTs, KSQL ou Kafka Streams.
Como posso manipular alterações de esquema no banco de dados usando o conector de origem JDBC?
O conector de origem JDBC pode manipular alterações de esquema no banco de dados automaticamente, desde que as alterações não afetem a chave primária ou as colunas de carimbo de data/hora usadas para modos de consulta. O conector detectará as alterações de esquema e atualizará o esquema dos registros de saída de acordo. No entanto, se as alterações de esquema afetarem a chave primária ou as colunas de carimbo de data/hora, será necessário reiniciar o conector com um novo arquivo de configuração que reflita as alterações.
Como posso proteger a conexão entre o conector de origem JDBC e o banco de dados usando SSL?
Para proteger a conexão entre o conector de origem JDBC e o banco de dados usando SSL, você precisa habilitar o SSL em seu servidor de banco de dados e configurar seu driver JDBC adequadamente. Você também precisa fornecer os certificados SSL e arquivos de armazenamento confiável para seu nó do trabalhador Kafka Connect e especificá-los em sua propriedade connection.url. Por exemplo, para MySQL, você pode usar estes parâmetros:
"connection.url": "jdbc:mysql://localhost:3306/testdb?user=root&password=root&useSSL=true&requireSSL=true&verifyServerCertificate=true&trustCertificateKeyStoreUrl=file:///path/to/truststore.jks&trustCertificateKeyStorePassword=truststorepassword"
Como posso filtrar ou transformar dados antes de enviá-los ao Kafka usando o conector de origem JDBC?
Para filtrar ou transformar dados antes de enviá-los ao Kafka usando o conector de origem JDBC, você pode usar uma destas opções:
Use uma consulta SQL personalizada em sua propriedade de consulta que executa as operações de filtragem ou transformação no lado do banco de dados.
Use um Kafka Connect Single Message Transform (SMT) em sua propriedade de transformações que executa as operações de filtragem ou transformação em seu lado do Kafka Connect.
Use um aplicativo KSQL ou Kafka Streams que consuma dados de seu tópico Kafka e execute as operações de filtragem ou transformação em seu lado de processamento de fluxo.
Como posso unir dados de várias tabelas usando o conector de origem JDBC?
Para unir dados de várias tabelas usando o conector de origem JDBC, você pode usar uma destas opções:
Use uma consulta SQL personalizada em sua propriedade de consulta que executa a operação de junção no lado do banco de dados.
Use várias instâncias do conector de origem JDBC, uma para cada tabela, e envie dados para diferentes tópicos Kafka. Em seguida, use um aplicativo KSQL ou Kafka Streams que consome dados desses tópicos e executa a operação de junção no lado do processamento do fluxo.
0517a86e26
Comments