Replication Kafka
Ir para navegação
Ir para pesquisar
#!/bin/bash
export URL="http://localhost:8080"
export CONTROLMD5=/opt/replicator-orchestrator/CONTROLMD5
export REPLICATORCONFIGTEMPLATE=/etc/kafka-connect-replicator/kafka-connect-replicator.properties.template
export REPLICATORCONFIG=/etc/kafka-connect-replicator/kafka-connect-replicator.properties
function GET_TOPICS() {
export DESTINATION="$1"
export COMPLEMENT="$2"
curl -X GET \
-H "replicator-destination: $DESTINATION" \
$URL/api/topic$COMPLEMENT 2>/dev/null
}
function GET_TOPICS_BY_WORKER() {
export DESTINATION="$1"
export COMPLEMENT="/worker/$2"
GET_TOPICS $DESTINATION "$COMPLEMENT" | \
jq -r '.[].name' | \
sort -u
}
function GET_TOPICS_MD5_BY_WORKER() {
export DESTINATION="$1"
export COMPLEMENT="/worker/$2"
GET_TOPICS $DESTINATION "$COMPLEMENT" | \
jq -r '.[].name' | \
sort -u | \
md5sum | \
awk '{ print $1 }'
}
function RESTART_REPLICATOR() {
echo Restarting service kafka-connect-replicator.service
systemctl restart kafka-connect-replicator.service
}
function ARQTEMP() {
case $1 in
create)
mktemp
;;
delete)
rm -fr $2
;;
esac
}
if [ $# -ne 2 ]
then
echo Not Enought Arguments
exit 1
fi
export CLUSTERNAME="$1"
export WROKERRNAME="$2"
export CONTROMD5DATA=$(cat $CONTROLMD5 )
export CURRENTMD5=$(GET_TOPICS_MD5_BY_WORKER $CLUSTERNAME $WROKERRNAME )
export TOPICS=$(GET_TOPICS_BY_WORKER $CLUSTERNAME $WROKERRNAME | tr '\n' ' ')
if [ "$CONTROMD5DATA" != "$CURRENTMD5" ]
then
echo $(date) -- Updating, STATE=$CONTROMD5DATA CURRENT=$CURRENTMD5
cat $REPLICATORCONFIGTEMPLATE | sed -e "s/^topic.whitelist.*/topic.whitelist: $TOPICS/g" > $REPLICATORCONFIG
RESTART_REPLICATOR
echo $CURRENTMD5 > $CONTROLMD5
else
echo $(date) -- No changes, STATE=$CONTROMD5DATA CURRENT=$CURRENTMD5
fi