Skip to content

add search functions by key(--keysearch) #187

Open
@anch665

Description

Please add search functions by key

to determine the partition where the key is located, use the Utils.murmur2 function
for this we wrote a simple library, partitionIdentifier.jar

With your help, I will determine the maximum number of partitions and transfer the key and the maximum number of partitions to the library. This way I will determine in which partition the events for this key are located. Next, I search for events in the desired partition.
This gives a very high speed of searching for the necessary messages, especially in topics with more than 64 partitions.

partitionIdentifier.jar

package ru.msk;
import org.apache.kafka.common.utils.Utils;
public class Main {
    public static void main(String[] args) {
        try{
            getPartitionByKey(args[0], args[1]);
        } catch (Exception e) {
            System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
        }
    }

    public static void getPartitionByKey(String key, String numPartitions){
        try{
            int partitions = Integer.parseInt(numPartitions);
            partitions++;
            if ((key != null && !key.isEmpty()) && partitions > 0){
                System.out.println ((Utils.murmur2(Utils.utf8(key)) & 0x7fffffff) % partitions);
            } else {
                System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
            }
        }catch (Exception e){
            System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
        }
    }
}

kafkactlkey.sh

JAR="/rtm/kafka/libs/partitionIdentifier.jar"
KEY=$(echo ""$@"" | egrep -o "\-\-keysearch.[[:digit:]]+\S+[[:digit:]]"  | awk -F " " '{print $2}')
TOPIC=$(echo ""$@"" | grep -E 'consume\s[^ ]+\s' |awk -F " " '{print $2}' )
STANDARTVAR=$(sed -r 's/--keysearch(.|\s+)\w+\S\w+//' <<< $@)
TIMESTAMP=$(echo ""$@"" | grep -o '\-\-from-timestamp')


if [ ! -f $JAR ]; then
  echo "File $JAR не найден."
  exit
fi

searchtokey () {
  if [ -z $KEY ]  ;then
    help
  fi
  KEYEXAMPL=$(kafkactl consume $TOPIC -k -b --max-messages 1 --output yaml | grep 'key:'| awk -F " " '{print $2}')
  if [ -z $KEYEXAMPL ] ; then
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    echo
    echo -en "\033[0;31m  Топик $TOPIC не содержит ключа, используйте команду:  \033[0m \n"
    echo "  kafkactl $STANDARTVAR | grep ${KEY}"
    echo
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    exit
  fi
  MAXPARTNUM=$(kafkactl get topics | grep -P "^$TOPIC "| awk -F " " '{print $2}')
  if [ -z $MAXPARTNUM ]  ;then
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    echo
    echo "не удалось опередить количество партиций в топике $TOPIC, выход"
    echo
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    exit
  fi
  MAXPARTNUM=$(($MAXPARTNUM-1))
  CURRENTPARTNUM=$(java -cp "/rtm/kafka/libs/partitionIdentifier.jar:/rtm/kafka/libs/*" ru.msk.Main $KEY $MAXPARTNUM)
  if [ -z $CURRENTPARTNUM ]  ;then
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    echo
    echo "не удалось определить партицию топике $TOPIC по ключу $KEY, выход"
    echo
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    exit
  fi

  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
  echo
  echo "   --keysearch $KEY найден в партиции $CURRENTPARTNUM топика $TOPIC. Первый ключ в топике $KEYEXAMPL"
  if [ "x$TIMESTAMP" != "x" ] ; then
    ARGS="$STANDARTVAR -k --partitions $CURRENTPARTNUM"
#    echo "   from timestamp"
  else
    ARGS="$STANDARTVAR -k --from-beginning --partitions $CURRENTPARTNUM"
    echo "   from beggin"
  fi
  ARGS=$(sed -r 's/[ ]{2,}/ /' <<< $ARGS)
#  echo "   kafkactl $ARGS | grep $KEY"
  echo
  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"

  exec kafkactl ${ARGS} | grep ${KEY}



}


help () {
  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
  echo
  echo "   --keysearch или параметр не указан, пример использования:"
  echo "   kafkactlkey.sh consume RTDM-INPUT --keysearch 7924777777"
  echo
  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"

}

if [[ $@ =~ "--keysearch" ]] ;then
  # New query
  searchtokey
else
  # Standart query
  help
fi

Usage example

 ./kafkactlkey consume SDP-QUOTA-EVENTS --from-timestamp 1705800000  --keysearch 15654565
-------------------------------------------------------------------------------------------------------------------------------------------------

  Топик SDP-QUOTA-EVENTS не содержит ключа, используйте команду:
  kafkactl consume SDP-QUOTA-EVENTS --from-timestamp 1705800000  | grep 15654565

-------------------------------------------------------------------------------------------------------------------------------------------------

./kafkactlkey consume RTDM-INPUT --from-timestamp 1705800000  --keysearch 79280483033
-------------------------------------------------------------------------------------------------------------------------------------------------

   --keysearch 79280483033 найден в партиции 95 топика RTDM-INPUT. Первый ключ в топике "79289026353"

-------------------------------------------------------------------------------------------------------------------------------------------------
79280483033#{"SAPPN":"234242.170","MSISDN":"79280483033","EVAM_SCENARIO":"CAMP_0568_Smart","CAMPAIGN_CD":"CAMP2018850","M_REG_NAME":"Москва","UUID":"c396ec4d-4296-442d-bbeee-16da54a6e803","EVAM_EVENT":"RTDM_Response","RTDM_EVENT":"CAMP_0568_Smart - Приглашение в кампанию"}

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requesthelp wantedExtra attention is needed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions