Replication Kafka

De Wiki Clusterlab.com.br
Ir para navegação Ir para pesquisar

#!/bin/bash
export URL="http://localhost:8080"

export CONTROLMD5=/opt/replicator-orchestrator/CONTROLMD5

export REPLICATORCONFIGTEMPLATE=/etc/kafka-connect-replicator/kafka-connect-replicator.properties.template
export REPLICATORCONFIG=/etc/kafka-connect-replicator/kafka-connect-replicator.properties

function GET_TOPICS() {
    export DESTINATION="$1"
    export COMPLEMENT="$2"
    curl -X GET \
    -H "replicator-destination: $DESTINATION" \
    $URL/api/topic$COMPLEMENT 2>/dev/null
}
function GET_TOPICS_BY_WORKER() {
    export DESTINATION="$1"
    export COMPLEMENT="/worker/$2"
    GET_TOPICS $DESTINATION "$COMPLEMENT" | \
        jq  -r '.[].name' | \
        sort -u 
}
function GET_TOPICS_MD5_BY_WORKER() {
    export DESTINATION="$1"
    export COMPLEMENT="/worker/$2"
    GET_TOPICS $DESTINATION "$COMPLEMENT" | \
        jq  -r '.[].name' | \
        sort -u | \
        md5sum | \
        awk '{ print $1 }'
}
function RESTART_REPLICATOR() {
    echo Restarting service kafka-connect-replicator.service
    systemctl restart kafka-connect-replicator.service
}
function ARQTEMP() {
    case $1 in
        create)
            mktemp 
            ;;
        delete)
            rm -fr $2
            ;;
    esac
    }
if [ $# -ne 2 ]
then
    echo Not Enought Arguments
    exit 1
fi
export CLUSTERNAME="$1"
export WROKERRNAME="$2"
export CONTROMD5DATA=$(cat $CONTROLMD5 )
export CURRENTMD5=$(GET_TOPICS_MD5_BY_WORKER  $CLUSTERNAME $WROKERRNAME )
export TOPICS=$(GET_TOPICS_BY_WORKER  $CLUSTERNAME $WROKERRNAME | tr '\n' ' ')
if [ "$CONTROMD5DATA" != "$CURRENTMD5" ]
then
    echo $(date) -- Updating, STATE=$CONTROMD5DATA CURRENT=$CURRENTMD5
    cat $REPLICATORCONFIGTEMPLATE | sed -e  "s/^topic.whitelist.*/topic.whitelist: $TOPICS/g" > $REPLICATORCONFIG
    RESTART_REPLICATOR
    echo $CURRENTMD5 > $CONTROLMD5
else
    echo $(date) -- No changes, STATE=$CONTROMD5DATA CURRENT=$CURRENTMD5
fi