Debugging Logstash

Running all of the logstash pipelines in debug mode produces an inundating amount of data, but you can stop logstash and manually run an individual pipeline file (-f) from the command line:

/opt/elk/logstash/bin/logstash -f /opt/elk/logstash/config/ljr_testing.conf --log.level debug

Which still produces a lot of output, but it’s all relevant to the pipeline that is experiencing a problem:

Using bundled JDK: /opt/elk/logstash/jdk
/opt/elk/logstash-8.13.1/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_int
/opt/elk/logstash-8.13.1/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined; discarding old to_f
Sending Logstash logs to /opt/elk/logstash/logs which is now configured via log4j2.properties
[2024-10-23T10:47:36,364][INFO ][logstash.runner          ] Log4j configuration path used is: /opt/elk/logstash/config/log4j2.properties
[2024-10-23T10:47:36,369][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.13.1", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}
[2024-10-23T10:47:36,371][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11]
[2024-10-23T10:47:36,376][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"fb_apache", :directory=>"/opt/elk/logstash/modules/fb_apache/configuration"}
[2024-10-23T10:47:36,376][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"fb_apache", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x14917e38 @directory="/opt/elk/logstash/modules/fb_apache/configuration", @module_name="fb_apache", @kibana_version_parts=["6", "0", "0"]>}
[2024-10-23T10:47:36,379][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"netflow", :directory=>"/opt/elk/logstash/modules/netflow/configuration"}
[2024-10-23T10:47:36,379][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"netflow", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x5909021c @directory="/opt/elk/logstash/modules/netflow/configuration", @module_name="netflow", @kibana_version_parts=["6", "0", "0"]>}
[2024-10-23T10:47:36,408][DEBUG][logstash.runner          ] Setting global FieldReference escape style: none
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] -------- Logstash Settings (* means modified) ---------
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] allow_superuser: true
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] node.name: "logstashhost.example.net"
[2024-10-23T10:47:36,606][DEBUG][logstash.runner          ] *path.config: "/opt/elk/logstash/config/ljr_testing.conf"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] path.data: "/opt/elk/logstash/data"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules.cli: #<Java::OrgLogstashUtil::ModulesSettingArray: []>
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules: []
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules_list: []
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules_variable_list: []
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] modules_setup: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.test_and_exit: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.reload.automatic: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.reload.interval: #<Java::OrgLogstashUtil::TimeValue:0x4e8224cf>
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.support_escapes: false
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] config.field_reference.escape_style: "none"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] event_api.tags.illegal: "rename"
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] metric.collect: true
[2024-10-23T10:47:36,607][DEBUG][logstash.runner          ] pipeline.id: "main"
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.system: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.workers: 4
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.batch.size: 125
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.batch.delay: 50
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.unsafe_shutdown: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.reloadable: true
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.plugin_classloaders: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.separate_logs: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.ordered: "auto"
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] pipeline.ecs_compatibility: "v8"
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] path.plugins: []
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] config.debug: false
[2024-10-23T10:47:36,608][DEBUG][logstash.runner          ] *log.level: "debug" (default: "info")
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] version: false
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] help: false
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] enable-local-plugin-development: false
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] log.format: "plain"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.enabled: true
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *api.http.host: "10.166.25.148" (via deprecated `http.host`; default: "127.0.0.1")
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *http.host: "10.166.25.148"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *api.http.port: 9600..9600 (via deprecated `http.port`; default: 9600..9700)
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] *http.port: 9600..9600
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.environment: "production"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.auth.type: "none"
[2024-10-23T10:47:36,609][DEBUG][logstash.runner          ] api.auth.basic.password_policy.mode: "WARN"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.length.minimum: 8
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.upper: "REQUIRED"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.lower: "REQUIRED"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.digit: "REQUIRED"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.auth.basic.password_policy.include.symbol: "OPTIONAL"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.ssl.enabled: false
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] api.ssl.supported_protocols: []
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.type: "memory"
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.drain: false
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.page_capacity: 67108864
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.max_bytes: 1073741824
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.max_events: 0
[2024-10-23T10:47:36,610][DEBUG][logstash.runner          ] queue.checkpoint.acks: 1024
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] queue.checkpoint.writes: 1024
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] queue.checkpoint.interval: 1000
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] queue.checkpoint.retry: true
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.enable: false
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.max_bytes: 1073741824
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.flush_interval: 5000
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] dead_letter_queue.storage_policy: "drop_newer"
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.warn: #<Java::OrgLogstashUtil::TimeValue:0x3436da82>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.info: #<Java::OrgLogstashUtil::TimeValue:0x1001ede3>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.debug: #<Java::OrgLogstashUtil::TimeValue:0x41820105>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] slowlog.threshold.trace: #<Java::OrgLogstashUtil::TimeValue:0x2bb4b203>
[2024-10-23T10:47:36,611][DEBUG][logstash.runner          ] keystore.classname: "org.logstash.secret.store.backend.JavaKeyStore"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] keystore.file: "/opt/elk/logstash/config/logstash.keystore"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] *monitoring.cluster_uuid: "hosq-h4ESECe63l2DvgymA"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.queue: "/opt/elk/logstash/data/queue"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.dead_letter_queue: "/opt/elk/logstash/data/dead_letter_queue"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.settings: "/opt/elk/logstash/config"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] path.logs: "/opt/elk/logstash/logs"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.enabled: false
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.hosts: ["http://localhost:9200"]
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.collection.interval: #<Java::OrgLogstashUtil::TimeValue:0x35f2b220>
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.collection.timeout_interval: #<Java::OrgLogstashUtil::TimeValue:0x296c863c>
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.username: "logstash_system"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.ssl.verification_mode: "full"
[2024-10-23T10:47:36,612][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.ssl.cipher_suites: []
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.sniffing: false
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.monitoring.collection.pipeline.details.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.monitoring.collection.config.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.enabled: false
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.hosts: ["http://localhost:9200"]
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.interval: #<Java::OrgLogstashUtil::TimeValue:0x6d5f9907>
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.timeout_interval: #<Java::OrgLogstashUtil::TimeValue:0x69d0fac0>
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.username: "logstash_system"
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.ssl.verification_mode: "full"
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.ssl.cipher_suites: []
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.elasticsearch.sniffing: false
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.pipeline.details.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] monitoring.collection.config.enabled: true
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] node.uuid: ""
[2024-10-23T10:47:36,613][DEBUG][logstash.runner          ] xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.geoip.downloader.poll.interval: #<Java::OrgLogstashUtil::TimeValue:0x52e2ea7a>
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.geoip.downloader.enabled: true
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.enabled: false
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.logstash.poll_interval: #<Java::OrgLogstashUtil::TimeValue:0x6d5a43c1>
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.pipeline.id: ["main"]
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.username: "logstash_system"
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.hosts: ["https://localhost:9200"]
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.ssl.cipher_suites: []
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.ssl.verification_mode: "full"
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] xpack.management.elasticsearch.sniffing: false
[2024-10-23T10:47:36,614][DEBUG][logstash.runner          ] --------------- Logstash Settings -------------------
[2024-10-23T10:47:36,640][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-10-23T10:47:36,682][DEBUG][logstash.agent           ] Initializing API WebServer {"api.http.host"=>"10.166.25.148", "api.http.port"=>9600..9600, "api.ssl.enabled"=>false, "api.auth.type"=>"none", "api.environment"=>"production"}
[2024-10-23T10:47:36,790][DEBUG][logstash.api.service     ] [api-service] start
[2024-10-23T10:47:36,868][DEBUG][logstash.agent           ] Setting up metric collection
[2024-10-23T10:47:36,912][DEBUG][logstash.instrument.periodicpoller.os] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,121][DEBUG][logstash.instrument.periodicpoller.jvm] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,178][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2024-10-23T10:47:37,182][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
[2024-10-23T10:47:37,196][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,205][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,208][DEBUG][logstash.instrument.periodicpoller.flowrate] Starting {:polling_interval=>5, :polling_timeout=>120}
[2024-10-23T10:47:37,519][DEBUG][logstash.agent           ] Starting agent
[2024-10-23T10:47:37,530][DEBUG][logstash.agent           ] Starting API WebServer (puma)
[2024-10-23T10:47:37,586][DEBUG][logstash.config.source.local.configpathloader] Reading config file {:config_file=>"/opt/elk/logstash/config/ljr_testing.conf"}
[2024-10-23T10:47:37,596][DEBUG][logstash.agent           ] Trying to start API WebServer {:port=>9600, :ssl_enabled=>false}
[2024-10-23T10:47:37,657][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>1}
[2024-10-23T10:47:37,669][DEBUG][logstash.agent           ] Executing action {:action=>LogStash::PipelineAction::Create/pipeline_id:main}
[2024-10-23T10:47:37,683][DEBUG][org.logstash.secret.store.SecretStoreFactory] Attempting to exists or secret store with implementation: org.logstash.secret.store.backend.JavaKeyStore
[2024-10-23T10:47:37,758][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2024-10-23T10:47:38,172][INFO ][org.reflections.Reflections] Reflections took 86 ms to scan 1 urls, producing 132 keys and 468 values
[2024-10-23T10:47:38,191][DEBUG][org.logstash.secret.store.SecretStoreFactory] Attempting to exists or secret store with implementation: org.logstash.secret.store.backend.JavaKeyStore
[2024-10-23T10:47:38,219][DEBUG][logstash.plugins.registry] On demand adding plugin to the registry {:name=>"generator", :type=>"input", :class=>LogStash::Inputs::Generator}
[2024-10-23T10:47:38,289][DEBUG][logstash.plugins.registry] On demand adding plugin to the registry {:name=>"json", :type=>"codec", :class=>LogStash::Codecs::JSON}
[2024-10-23T10:47:38,318][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@id = "json_3f7d0e36-fb51-4b2a-993f-4287698b2b63"
[2024-10-23T10:47:38,319][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@enable_metric = true
[2024-10-23T10:47:38,319][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@charset = "UTF-8"
[2024-10-23T10:47:38,346][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@count = 1
[2024-10-23T10:47:38,350][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@codec = <LogStash::Codecs::JSON id=>"json_3f7d0e36-fb51-4b2a-993f-4287698b2b63", enable_metric=>true, charset=>"UTF-8">
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@id = "ca89bb64fce3017f652b7d9e15198c67ecfbcea7435cac7e2316ed9f567a0820"
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@message = "{\"test\": \"data\"}"
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@enable_metric = true
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@add_field = {}
[2024-10-23T10:47:38,351][DEBUG][logstash.inputs.generator] config LogStash::Inputs::Generator/@threads = 1
[2024-10-23T10:47:38,392][DEBUG][logstash.plugins.registry] On demand adding plugin to the registry {:name=>"http", :type=>"output", :class=>LogStash::Outputs::Http}
[2024-10-23T10:47:38,405][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@id = "json_523f376a-5ed8-44dc-a17c-ff9f1daad51a"
[2024-10-23T10:47:38,406][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@enable_metric = true
[2024-10-23T10:47:38,406][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@charset = "UTF-8"
[2024-10-23T10:47:38,413][ERROR][logstash.outputs.http    ] Unknown setting 'ssl_verification_mode' for http
[2024-10-23T10:47:38,420][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (ConfigurationError) Something is wrong with your configuration.", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.AbstractPipelineExt.initialize(AbstractPipelineExt.java:186)", "org.logstash.execution.AbstractPipelineExt$INVOKER$i$initialize.call(AbstractPipelineExt$INVOKER$i$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:847)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1319)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1292)", "org.jruby.ir.targets.indy.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:30)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/java_pipeline.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:446)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:92)", "org.jruby.RubyClass.newInstance(RubyClass.java:931)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.indy.InvokeSite.performIndirectCall(InvokeSite.java:735)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:657)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/pipeline_action/create.rb:49)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/pipeline_action/create.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.ir.targets.indy.InvokeSite.performIndirectCall(InvokeSite.java:735)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:657)", "opt.elk.logstash_minus_8_dot_13_dot_1.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/opt/elk/logstash-8.13.1/logstash-core/lib/logstash/agent.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:141)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:64)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:144)", "org.jruby.RubyProc.call(RubyProc.java:352)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111)", "java.base/java.lang.Thread.run(Thread.java:840)"]}
[2024-10-23T10:47:38,448][DEBUG][logstash.agent           ] Shutting down all pipelines {:pipelines_count=>0}
[2024-10-23T10:47:38,449][DEBUG][logstash.agent           ] Converging pipelines state {:actions_count=>0}
[2024-10-23T10:47:38,452][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2024-10-23T10:47:38,457][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2024-10-23T10:47:38,458][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2024-10-23T10:47:38,458][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2024-10-23T10:47:38,458][DEBUG][logstash.instrument.periodicpoller.flowrate] Stopping
[2024-10-23T10:47:38,483][DEBUG][logstash.agent           ] API WebServer has stopped running
[2024-10-23T10:47:38,484][INFO ][logstash.runner          ] Logstash shut down.
[2024-10-23T10:47:38,493][FATAL][org.logstash.Logstash    ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:808) ~[jruby.jar:?]
        at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:767) ~[jruby.jar:?]
        at opt.elk.logstash.lib.bootstrap.environment.<main>(/opt/elk/logstash/lib/bootstrap/environment.rb:90) ~[?:?]
logstashhost:config #

Voila — the error!

[2024-10-23T10:47:38,413][ERROR][logstash.outputs.http    ] Unknown setting 'ssl_verification_mode' for http

And, it turns out, the ssl_verification_mode setting didn’t exist in the older version of logstash that’s running in prod. Just as well, it appears to be working and verifying the cert chain on the destination.

Kafka Consumer and Producer Example – Java

Producer

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class SimpleKafkaProducer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaProducer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaHost = args[0];
        String kafkaPort = args[1];
        String kafkaServer = kafkaHost + ":" + kafkaPort;
        String topicName = "LJRJavaTest";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaProducer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);

            // Define the date-time formatter
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd 'at' HH:mm:ss");

            try {
                for (int i = 0; i < 10; i++) {
                    String key = "Key-" + i;
                    
                    // Get the current timestamp
                    String timestamp = LocalDateTime.now().format(formatter);
                    
                    // Include the timestamp, Kafka host, and port in the message value
                    String value = String.format("Message-%d (Test from %s on server %s:%s)", i, timestamp, kafkaHost, kafkaPort);
                    
                    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
                    producer.send(record);
                    System.out.println("Sent message: (" + key + ", " + value + ")");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Please provide Kafka server and port. Usage: SimpleKafkaConsumer <kafka-server> <port>");
            System.exit(1);
        }

        String kafkaServer = args[0] + ":" + args[1];
        String topicName = "LJRJavaTest";
        String groupId = "test-consumer-group";
        String truststorePassword = "truststore-password";

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Load truststore from resources
        try (InputStream truststoreStream = SimpleKafkaConsumer.class.getResourceAsStream("/kafka.client.truststore.jks")) {
            if (truststoreStream == null) {
                throw new RuntimeException("Truststore not found in resources");
            }

            // Create a temporary file to hold the truststore
            java.nio.file.Path tempTruststore = java.nio.file.Files.createTempFile("kafka.client.truststore", ".jks");
            java.nio.file.Files.copy(truststoreStream, tempTruststore, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

            props.put("security.protocol", "SSL");
            props.put("ssl.truststore.location", tempTruststore.toString());
            props.put("ssl.truststore.password", truststorePassword);

            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topicName));

            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("Consumed message: (key: %s, value: %s, offset: %d, partition: %d)%n",
                                record.key(), record.value(), record.offset(), record.partition());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Keystore for Trusts

The cert on my Kafka server is signed by a CA that has both a root and intermediary signing certificate. To trust the chain, I need to populate rootCA.crt and intermediateCA.crt files with the correct base64 encoded public key and import both files into a new trust store:

keytool -importcert -file rootCA.crt -keystore kafka.client.truststore.jks -alias rootCA -storepass truststore-password

keytool -importcert -file intermediateCA.crt -keystore kafka.client.truststore.jks -alias intermediateCA -storepass truststore-password

Then list certs in the trust store to verify the import was successful:

keytool -list -keystore kafka.client.truststore.jks -storepass truststore-password

And place the jks keystore with the CA certs in project:

C:.
│   pom.xml
│
├───src
│   └───main
│       ├───java
│       │   └───com
│       │       └───example
│       │               SimpleKafkaProducer.java
│       │
│       └───resources
│               kafka.client.truststore.jks

Build and Run

Create a pom.xml for the build

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-producer-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Kafka client dependency -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Compiler plugin to ensure Java 8 compatibility -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Exec Maven Plugin to run the Java class -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <mainClass>com.example.SimpleKafkaProducer</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Build using `mvn clean package`

Run the producer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaProducer” -Dexec.args=”kafkahost.example.net 9095″`

Run the consumer using ` mvn exec:java -Dexec.mainClass=”com.example.SimpleKafkaConsumer” -Dexec.args=”kafkahost.example.net 9095″`

K8s, resolv.conf, and ndots

I had a very strange problem when firewalld was used with nftables as the back end – rules configured properly in firewalld didn’t exist in the nftables rulesets so … didn’t exist. The most obvious failure in the k8s cluster was DNS resolution – requests to any nodes where nftables was the back end just timed out. In diagnosing the “dns queries time out” issue, I was watching the logs from the coredns pods. And I saw a lot of NXDOMAIN errors. Not because I had a hostname mistyped or anything – each pod was appending every domain in the resolv.conf search order before trying the actual hostname.

Quick solution was to update our hostnames to include the trailing dot for the root zone. It is not redishost.example.com but rather redishost.example.com.

But that didn’t explain why – I’ve got plenty of Linux boxes where there are some search domains in resolv.conf. Never once seen redishost.example.com.example.com come across the query log. There is a configuration that I’ve rarely used that is designed to speed up getting to the search list. You can configure ndots – the default is one, but you can set whatever positive integer you would like. Surely, they wouldn’t set ndots to something crazy high … right??

Oh, look –

Defaulted container "kafka-streams-app" out of: kafka-streams-app, filebeat
bash-4.4# cat /etc/resolv.conf
search kstreams.svc.cluster.local svc.cluster.local cluster.local mgmt.example.net dsys.example.net dnoc.example.net admin.example.net example.com
nameserver 10.6.0.5
options ndots:5

Yup, it’s right there in the source — and it’s been there for seven years:

What does this mean? Well, ndots is really just the number of dots in a hostname. If there are fewer than ndots dots, the resolver will try appending the search domains first and then try what you typed as a last resort. With one dot, that basically means a string with no dots will get the search domains appended. I guess if you go out and register a gTLD for your company – my hostname is literally just example. – then you’ll have a little inefficiency as the search domains are tried. But that’s a really edge case. With the k8s default, anything with fewer than five dots gets all of those search domains appended first.

So I need redishost.example.com? I see the following resolutions fail because there is no such hostname:

[INFO] 64.24.29.155:57014 - # "A IN redishost.example.com.svc.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:57028 - # "AAAA IN redishost.example.com.svc.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:56096 - # "A IN redishost.example.com.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:56193 - # "AAAA IN redishost.example.com.cluster.local. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:55001 - # "A IN redishost.example.com.mgmt.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:55194 - # "AAAA IN redishost.example.com.mgmt.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:54078 - # "A IN redishost.example.com.dsys.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:54127 - # "AAAA IN redishost.example.com.dsys.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:52061 - # "A IN redishost.example.com.dnoc.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:52182 - # "AAAA IN redishost.example.com.dnoc.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:51018 - # "A IN redishost.example.com.admin.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:51104 - # "AAAA IN redishost.example.com.admin.example.net. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:50052 - # "A IN redishost.example.com.example.com. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s
[INFO] 64.24.29.155:50189 - # "AAAA IN redishost.example.com.example.com. udp # false 512" NXDOMAIN qr,aa,rd 158 0.00019419s

Wonderful — IPv6 is enabled and it’s trying AAAA records too. Finally it resolves redishost.example.com!

Luckily, there is a quick solution. Update the deployment YAML to include a custom ndots value – I like 1. I could see where someone might want two – something.else where I need svc.cluster.local appended, maybe I don’t want to waste time looking up something.else … I don’t want to do that. But I could see why something higher than one might be desirable in k8s. Not sure I buy it’s awesome enough to be the default, though!

Redeployed and instantly cut the DNS traffic by about 90% — and reduced application latency as each DNS call no longer has to have fourteen failures before the final success.

Watching Redis Records for Strange Changes

I write a lot of things down to save myself time the next time I need to do the same sort of thing — and publish this to the Internet in case I can save someone else time too. But this one is so specific, I’m not sure it’s an “ever going to encounter this again” sort of thing. Just in case, though — I have device data being stored in redis — because the device doesn’t know its throughput values, you need the last time and last value paired with the current device metrics to calculate throughput. OK. But, sporadically, the cached data is updated insomuch as a new record is posted with a new timestamp. But the actual values, other than timestamp, remain unchanged. With millions of interfaces, it’s challenging to identify these situations by spot-checking the visualizations. Instead, I need to monitor redis and identify when the tstamp is updated but no other values change.

import redis
import time
import re
import json
import os

# Configuration
redis_host = 'redishost.example.net'
redis_port = 6379
redis_password = 'P@5sw0rDG03sH3r3'  # Replace with your Redis password
pattern = re.compile(r'INTERFACE_RAW_STATS_hostname\d\d\d\d_\d+_\d+')
output_file = 'changed_records.json'

# Connect to Redis
client = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)

# Dictionary to track records
records = {}
matching_keys = []

def get_matching_keys():
    """
    Retrieve keys from Redis matching the specified pattern.

    Returns:
        list: A list of keys that match the pattern.
    """
    all_keys = client.keys()
    matching_keys = [key for key in all_keys if pattern.match(key)]
    return matching_keys

def process_keys():
    """
    Process Redis keys to track changes in data.

    Retrieves keys matching the pattern, gets their data using HGETALL,
    and tracks changes. If only the 'tstamp' field has changed and all
    other fields remain the same, the record is written to a file.
    """
    global records
    i = 0

    for key in matching_keys:
        i += 1
        data = client.hgetall(key)
        if i == 1 or i % 1000 == 0:
            print(f"Processed {i} records")

        if not data:
            continue

        collector_name = data.get('collectorName')
        node_id = data.get('nodeId')
        if_index = data.get('ifIndex')
        tstamp = data.get('tstamp')

        if not collector_name or not node_id or not if_index or not tstamp:
            continue

        unique_key = f"{collector_name}_{node_id}_{if_index}"

        if unique_key in records:
            previous_data = records[unique_key]
            if previous_data['tstamp'] != tstamp:
                # Check if all other values are the same
                if all(data[k] == previous_data[k] for k in data if k != 'tstamp'):
                    print(f"***** Record changed: {json.dumps(data, indent=2)} *****")
                    write_to_file(data)
            records[unique_key] = data  # Update the record
        else:
            records[unique_key] = data

def write_to_file(data):
    """
    Write the given data to a file.

    Args:
        data (dict): The data to write to the file.
    """
    with open(output_file, 'a') as file:
        file.write(json.dumps(data) + '\n')

if __name__ == "__main__":
    # Ensure the output file is empty at the start
    if os.path.exists(output_file):
        os.remove(output_file)

    # Retrieve the list of matching keys once
    matching_keys = get_matching_keys()

    while True:
        process_keys()
        print("Sleeping ... ")
        time.sleep(300)  # Sleep for 5 minutes

Fedora 40: NFTables not logging

We upgraded Anya’s laptop to Fedora 40, and Skype has evidently moved from an installable RPM to a snap package. Which didn’t work with the firewall rules we built earlier in the year (video and audio calls would not connect); and, worse, nothing logs out. Looks like the netfilter kernel logging isn’t enabled

Enabled the logging:

echo 1 | sudo tee /proc/sys/net/netfilter/nf_log_all_netns

And, voila, we’ve got log records from nftables. And now Skype works … so I don’t know what to add. Sigh!

Azure DevOps Pipeline Error – Veracode Scan Fails

Pipeline Error:

Build Failed: Error: Exiting Veracode Upload and Scan Task: App not in state where new builds are allowed.

Resolution: There’s a scan in Veracode that never completed. Log into the web UI and delete it!

View the scans in the sandbox. Select the one that says “Request Incomplete”

Use the ellipsis button to select “Delete Request”

Confirm deletion.

Voila – now you can re-run the pipeline and the scan will proceed.

Kafka Streams, Consumer Groups, and Stickiness

The Java application I recently inherited had a lot of … quirks. One of the strangest was that it calculated throughput statistics based on ‘start’ values in a cache that was only refreshed every four hours. So at a minute past the data refresh, the throughput is averaged out over that minute. At three hours and fifty nine minutes past the data refresh, the throughput is averaged out over three hours and fifty nine minutes. In the process of correcting this (reading directly from the cached data rather than using an in-memory copy of the cached data), I noticed that the running application paused a lot as the Kafka group was re-balanced.

Which is especially odd because I’ve got a stable number of clients in each consumer group. But pods restart occasionally, and there was nothing done to attempt to stabilize partition assignment.

Which was odd because Kafka has had mechanisms to reduce re-balancing — StickyAssignor added in 0.11

        // Set the partition assignment strategy to StickyAssignor
        config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

And groupInstanceId in 2.3.0

        // Set the group instance ID
        String groupInstanceId = UUID.randomUUID().toString();
        config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);

Now, I’m certain that a UUID isn’t the best way to go about crafting your group instance ID name … but it produces a “name” that isn’t likely to be duplicated. Since deploying this change, I went from seeing three or four re-balance operations an hour to zero.

Kafka Streams Group Members and Topic Partitions

I encountered an oddity in a Java application that uses Kafka Streams to implement a scalable application that reads data from Kafka topics. Data is broken out into multiple topics, and there are Kubernetes pods (“workers”) reading from each topic. The pods have different numbers of replicas defined. But it appears that no one ever aligned the topic partitions with the number of workers being deployed.

Kafka Streams assigns “work” to group members by partition. If you have ten partitions and five workers, each worker processes the data from two partitions. However, when the numbers don’t line up … some workers get more partitions than others. Were you to have eleven partitions and five workers, four workers would get data from two partitions and the fifth worker gets data from three.

Worse – in some cases we have more workers than partitions. Those extra workers are using up some resources, but they’re not actually processing data.

It’s a quick fix — partitions can be added mostly invisibly (the consumer group will be re-balanced, write operations won’t really change. New data just starts getting placed in the new partitions), so I increased our partition counts to be 2x the number of workers. This allows us to add a few workers to a topic if it gets backlogged, but the configuration evenly distributes the work across all of the normally running pods.

Springfield

Governor DeWine had an editorial in the NY Times this morning — https://www.nytimes.com/2024/09/20/opinion/springfield-haitian-migrants-ohio.html — he starts out OK. Essentially I’ve got a history in that area, I know the area, and I know this nonsense to be untrue. But then he says he still supports Trump because of the horrible problems with immigration elsewhere.

Without mentioning that a bipartisan attempt to do something got scuttled by the very guy I am telling you will fix it.

Worse – without thinking maybe some of those reports are made up or embellished like this situation has been? If I know first-hand what is going on in Springfield, and I know that Trump’s portrayal of it is wildly inaccurate? Why wouldn’t I question how accurate his depiction of Colorado or even the Mexican border is?!?

They just let you steal!

This is another case of simplifying a complex situation to make someone look bad: Cali lets you walk right out if you are stealing $950 or less. And ignoring the real problem (if you want to have all of these crimes investigated and prosecuted, your local/state taxes need to go up so departments can staff accordingly and enough jails can be built to house all of these criminals).

Every state has a law that differentiates between misdemeanor theft and felony theft. e.g. https://law.justia.com/codes/arkansas/title-5/subtitle-4/chapter-36/subchapter-1/section-5-36-103/ for Arkansas at $1000. I think most people would agree that someone who steals ten dollars worth of merchandise and someone who steals three grand worth of merchandise should get different punishments. Punishments are generally defined along with the classification of the crime. And each state’s law reflects this. Different states have different dollar amounts — and $950 sounds like a lot of money. But compared to the other states? California’s demarcation is pretty middle of the pack.

Unfortunately the entire criminal justice system is overloaded. Police may be too busy to deal with your theft complaint. The prosecutor may not get around to filing charges. And what happens if they do get a conviction? Now we need to find somewhere to detain the thief. And, again, if there’s a person stealing cars, a person kidnapping minors, and a person stealing the latest video game … ideally, we’d have resources to punish all of them. But we don’t. And I’d be way more upset if the kidnapper skated because they had a task force down at the GameStop gathering video from all the surrounding stores to track down this thief.

Prop 47 absolutely has some flaws. There’s a Prop 36 this year that seems like an attempt to fix some of the unintended consequences — two or more theft charges under $950 would be a felony. Stealing from multiple places that add up to over $950 would be a felony. Won’t know if that passes until November, but it’s not like all the liberals are dancing around saying this was 100% perfect and everyone else should do it too.

 

Alabama: $500
Alaska: $750
Arizona: $1,000
Arkansas: $1,000
California: $950
Colorado: $2,000
Connecticut: $2,000
Delaware: $1,500
Florida: $750
Georgia: $1,500
Hawaii: $750
Idaho: $1,000
Illinois: $500
Indiana: $750
Iowa: $300
Kansas: $1,500
Kentucky: $1,000
Louisiana: $1,000
Maine: $1,000
Maryland: $1,500
Massachusetts: $1,200
Michigan: $1,000
Minnesota: $1,000
Mississippi: $1,000
Missouri: $750
Montana: $1,500
Nebraska: $1,500
Nevada: $1,200
New Hampshire: $1,000
New Jersey: $200
New Mexico: $500
New York: $1,000
North Carolina: $1,000
North Dakota: $1,000
Ohio: $1,000
Oklahoma: $1,000
Oregon: $1,000
Pennsylvania: $2,000
Rhode Island: $1,500
South Carolina: $2,000
South Dakota: $1,000
Tennessee: $1,000
Texas: $2,500
Utah: $1,500
Vermont: $900
Virginia: $1,000
Washington: $750
West Virginia: $1,000
Wisconsin: $2,500
Wyoming: $1,000