Overview
KAFKA
설치/설정관련
server.properties
- 카프카 서버 설정 ( 클러스터들 주소, 주키퍼 주소 등 )
- broker.id 는 클러스터마다 바꿔줘야 함
zookeeper.properties
- 주키퍼 구동 관련 (포트 등 )
connect-distributed.properties
- connect 사용시 distribute 모드의 설정 ( 기본 connector 설정 및 cluster groupId 정도, 나머지는 REST API로 connerter마다 설정 )
log4j.properties
- kafka 로그 관련 connect-log4j.properties
주키퍼 기동
./bin/zkServer.sh start
./bin/zkServer.sh stop
카프카 기동
./bin/kafka-server-start.sh ./config/server.properties
nohup ./bin/kafka-server-start.sh ./config/server.properties &
카프카 커넥트 기동
./bin/kafka-server-stop.sh ./config/server.properties
CLI 명령
토픽 생성
./kafka-topics.sh --zookeeper {zookeeper-servers}:2181/{name} --replication-factor 1 --partitions 1 --topic {topic} --create
토픽에 Publish
./kafka-console-producer.sh --broker-list {kafka-servers}:9092 --topic {topic}
토픽을 consuming
./kafka-console-consumer.sh --bootstrap-server {kafka-servers}:9092 --topic {topic} --from-beginning
토픽정보 조회
./kafka-topics.sh —-bootstrap-server {kafka-servers}:9092 —-list
토픽특성확인
./kafka-topics.sh --zookeeper {kafka-servers}:2181/{name} --topic {topic} --describe
토픽설정 변경 (retain)
./kafka-configs.sh --zookeeper {kafka-servers}:2181/{name} --entity-type topics --entity-name {topic} --add-config "retention.ms=3600000" --alter
토픽설정 확인 (retain)
./kafka-configs.sh --zookeeper {kafka-servers}:2181/{name} --entity-type topics --entity-name {topic} --describe
토픽의 replica 재지정 ( rf.json 파일을 만들어서 처리해야 함 )
./kafka-reassign-partitions.sh --zookeeper {kafka-servers}:2181/{name} --reassignment-json-file ./rf.json --execute
rf.json
{
"version" : 1,
"partitions" : [
{ "topic" : "{topic}"
,"partition" : 0
,"replicas" : [1,2]
}
]
}
컨슈머 그룹 리스트 조회
./kafka-consumer-groups.sh --bootstrap-server {kafka-servers}:9092 --all-topics --list
컨슈머 그룹의 정보조회(offset, LAG 등)
./kafka-consumer-groups.sh --bootstrap-server {kafka-servers}:9092 --group {connect-group-name}--describe
컨슈머 그룹의 정보조회(offse 재지정)
./kafka-consumer-groups.sh --bootstrap-server {kafka-servers}:9092 --group {consumer-group-name} ——topic {topic} —-reset-offsets --to-latest --execute
// 특정 토픽의 시간조건(time -1, -2, timestamp)의 offset 정보 조회
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list aveditorspare01.ugedit.nfra.io:9092 --topic platform-api-apache-logs --time -1
KAFKA Connect
kafka connect distribute REST API
connector 목록조회
GET http://{kafka-connect-server}:8083/connectors
connector 등록
POST http://{kafka-connect-server}:8083/connectors
connector 상세내용 조회
GET http://{kafka-connect-server}:8083/connectors/{connector-name}
connector 수정
PUT http://{kafka-connect-server}:8083/connectors/{connector-name}
connector 삭제
DELETE http://{kafka-connect-server}:8083/connectors/{connector-name}
connector 상태조회
GET http://{kafka-connect-server}:8083/connectors/{connector-name}/status
0번째 테스크의 상태조회
GET http://{kafka-connect-server}:8083/connectors/{connector-name}/tasks/0/status
0번째 테스크 재시작
PUT http://{kafka-connect-server}:8083/connectors/{connector-name}/tasks/0/restart
0번째 테스크 정지
PUT http://{kafka-connect-server}:8083/connectors/{connector-name}/tasks/0/pause
Schema Registry
kafka registry 설정
여러 오픈소스가 존재하는 듯하지만 confluent걸 사용 (confluent 풀패키지를 받아야 함… )
압축해제 후
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
설정파일 조정 schema-registry.properties
주키퍼를 사용하는 방법이 있는듯하지만, kafka를 직접 이요하는 방법 사용을 위해 아래 부분 설정 ( 9092 카프카 port) kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
설정방식 조회 ( backward, forward, all )
http://localhost:8081/config
등록된 스키마 조회
http://localhost:8081/subjects
http://localhost:8081/subjects/{스키마이름}
http://localhost:8081/subjects/{스키마이름}/version/1
새로운 스키마로 전송할때마다, 버젼이 쌓임.
스키마파일 (id로 찾음)는 kafka에 _schemas 토픽에 담겨져있음.
설전변환
curl --header "Content-Type: application/json" -X PUT --data '{"compatibility": "FULL"}'
설정방식 특징
backward(기본) : 추가 default가 없으면 produce시 에러 , default가 null이 아닌값으로 지정되어야 함 (필드 삭제시는 괜찮음)
forward : 삭제시 default 없으면 에러 produce시 에러 (backward와 반대)
full : 필드추가, 필드삭제 모두 default가 고려 안되면 produce 못함
기타 팁
kafka connect 옵션
"errors.tolerance" : "all" <= 이 옵션을 줘야, task가 포맷이 이상한 데이터가 들어와도 죽지 않는다. ( 기본은 none )
토픽 삭제 관련
- 토픽삭제시 connect가 topic을 바라보고 있어서, 삭제 후 다시 생기는 문제점이 있음. => 토픽을 삭제해야한다면 kafka-connect를 중지시키고 삭제 후 재 생성 필요
JMX관련 설정
kafka 구동시 아래의 옵션을 추가해서 remote jmx port를 활성화시킴
export KAFKA_JMX_OPTS=’-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.rmi.port=1099’
JMX를 이용해서 KAFKA로그 레벨 변경하기
jconsole등으로 접근하여 kafka > kafka.Log4jController > setLogLevel ( kafka.request.logger ) 을 수정
카프카 Scale Out
- 카프카 확장은 카프카 브로커서버를 추가하고 ( id=4, id=5 클러스터 번호 추가) 투입하면 바로 사용가능 => but 기존 파티션들은 추가된 4,5번에 투입안되었으므로 reassign을 통해서 파티션 재구성 필요
주키퍼 앙상플 Scale Out
- 주키퍼가 추가되면, 주키퍼서버에 다른 클러스터 정보가 들어있으므로 기존 클러스터에 정보 추가 후 리붓 필요 ( 마스터를 가장 나중에 하고 하나씩 재기동 )
카프카 뒷단 구성
- NIFI <= 비쥬얼라이징된 데이터 파이프라인 구성 툴 ( connect sink, source 역할 가능 )
- 브루클린이라는 connect랑 비슷하지만 다른 kafka 큐로 보내는 구성에 특화된게 있는듯
카프카 모니터링
- burrow ( <= 링크드인에서 만든 lag 감시)
- 기본적으로 JMX로 데이터 수집하고, 처리함
카프카 KSQL
- SQL 문법처럼 Stream테이블을 생성하고, 토픽을 연결해서 변환하는 Stream 파이프라인을 만드는데 사용
- Confluent 패치키에 포함되어있음.
카프카 스트림
- Java로 Stream프로그램
- bulk로 받지 않고, 건건이 처리
kafka 클라이언트
- conductor 좋은듯 (유료됨)
- kafka manager 좀 아쉬운 점이 많음.
kafka-connect task 밸런싱 문제
connector 등록시 task를 10개로 해도 3대 노드 중 한군데서만 task파티션이 수행되는 이슈가 있음. 파티션수를 충분히 잡아서 여러 task에서 골고루 connect가 수행되도록 처리해야할 필요가 있음.
kafka consumer 옵션
fetch.min.bytes
- 한번에 가져올 수 있는 최소 데이터 사이즈 ( 데이터가 누적될 때까지 기다림)
fetch.max.wait.ms
- fetch.min.bytes 설정보다 적은 경우, 기다리는 최대시간
enable.auto.commit
- 백그라운드 자동 offset 커밋 auto.commit.interval.ms
auto.offset.reset
- earliest, latest, none 초기오프셋이 없을때 어떤 오프셋을 가져올지
session.timeout.ms heartbeat.interval.ms max.poll.records max.poll.interval.ms