Open
Description
Please add search functions by key
to determine the partition where the key is located, use the Utils.murmur2 function
for this we wrote a simple library, partitionIdentifier.jar
With your help, I will determine the maximum number of partitions and transfer the key and the maximum number of partitions to the library. This way I will determine in which partition the events for this key are located. Next, I search for events in the desired partition.
This gives a very high speed of searching for the necessary messages, especially in topics with more than 64 partitions.
partitionIdentifier.jar
package ru.msk;
import org.apache.kafka.common.utils.Utils;
public class Main {
public static void main(String[] args) {
try{
getPartitionByKey(args[0], args[1]);
} catch (Exception e) {
System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
}
}
public static void getPartitionByKey(String key, String numPartitions){
try{
int partitions = Integer.parseInt(numPartitions);
partitions++;
if ((key != null && !key.isEmpty()) && partitions > 0){
System.out.println ((Utils.murmur2(Utils.utf8(key)) & 0x7fffffff) % partitions);
} else {
System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
}
}catch (Exception e){
System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
}
}
}
kafkactlkey.sh
JAR="/rtm/kafka/libs/partitionIdentifier.jar"
KEY=$(echo ""$@"" | egrep -o "\-\-keysearch.[[:digit:]]+\S+[[:digit:]]" | awk -F " " '{print $2}')
TOPIC=$(echo ""$@"" | grep -E 'consume\s[^ ]+\s' |awk -F " " '{print $2}' )
STANDARTVAR=$(sed -r 's/--keysearch(.|\s+)\w+\S\w+//' <<< $@)
TIMESTAMP=$(echo ""$@"" | grep -o '\-\-from-timestamp')
if [ ! -f $JAR ]; then
echo "File $JAR не найден."
exit
fi
searchtokey () {
if [ -z $KEY ] ;then
help
fi
KEYEXAMPL=$(kafkactl consume $TOPIC -k -b --max-messages 1 --output yaml | grep 'key:'| awk -F " " '{print $2}')
if [ -z $KEYEXAMPL ] ; then
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
echo
echo -en "\033[0;31m Топик $TOPIC не содержит ключа, используйте команду: \033[0m \n"
echo " kafkactl $STANDARTVAR | grep ${KEY}"
echo
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
exit
fi
MAXPARTNUM=$(kafkactl get topics | grep -P "^$TOPIC "| awk -F " " '{print $2}')
if [ -z $MAXPARTNUM ] ;then
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
echo
echo "не удалось опередить количество партиций в топике $TOPIC, выход"
echo
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
exit
fi
MAXPARTNUM=$(($MAXPARTNUM-1))
CURRENTPARTNUM=$(java -cp "/rtm/kafka/libs/partitionIdentifier.jar:/rtm/kafka/libs/*" ru.msk.Main $KEY $MAXPARTNUM)
if [ -z $CURRENTPARTNUM ] ;then
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
echo
echo "не удалось определить партицию топике $TOPIC по ключу $KEY, выход"
echo
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
exit
fi
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
echo
echo " --keysearch $KEY найден в партиции $CURRENTPARTNUM топика $TOPIC. Первый ключ в топике $KEYEXAMPL"
if [ "x$TIMESTAMP" != "x" ] ; then
ARGS="$STANDARTVAR -k --partitions $CURRENTPARTNUM"
# echo " from timestamp"
else
ARGS="$STANDARTVAR -k --from-beginning --partitions $CURRENTPARTNUM"
echo " from beggin"
fi
ARGS=$(sed -r 's/[ ]{2,}/ /' <<< $ARGS)
# echo " kafkactl $ARGS | grep $KEY"
echo
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
exec kafkactl ${ARGS} | grep ${KEY}
}
help () {
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
echo
echo " --keysearch или параметр не указан, пример использования:"
echo " kafkactlkey.sh consume RTDM-INPUT --keysearch 7924777777"
echo
echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
}
if [[ $@ =~ "--keysearch" ]] ;then
# New query
searchtokey
else
# Standart query
help
fi
Usage example
./kafkactlkey consume SDP-QUOTA-EVENTS --from-timestamp 1705800000 --keysearch 15654565
-------------------------------------------------------------------------------------------------------------------------------------------------
Топик SDP-QUOTA-EVENTS не содержит ключа, используйте команду:
kafkactl consume SDP-QUOTA-EVENTS --from-timestamp 1705800000 | grep 15654565
-------------------------------------------------------------------------------------------------------------------------------------------------
./kafkactlkey consume RTDM-INPUT --from-timestamp 1705800000 --keysearch 79280483033
-------------------------------------------------------------------------------------------------------------------------------------------------
--keysearch 79280483033 найден в партиции 95 топика RTDM-INPUT. Первый ключ в топике "79289026353"
-------------------------------------------------------------------------------------------------------------------------------------------------
79280483033#{"SAPPN":"234242.170","MSISDN":"79280483033","EVAM_SCENARIO":"CAMP_0568_Smart","CAMPAIGN_CD":"CAMP2018850","M_REG_NAME":"Москва","UUID":"c396ec4d-4296-442d-bbeee-16da54a6e803","EVAM_EVENT":"RTDM_Response","RTDM_EVENT":"CAMP_0568_Smart - Приглашение в кампанию"}
Activity