-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reading JSON append blobs from storage account #36
Comments
The problem here is the way AppendBlobs grow versus the BlockBlobs. When processing the file for the first time it can read the full file and process it properly. The plugin was originally written with BlockBlobs in mind, those have growing JSON constructs and the parsing needs to take the header and the footer into account. Here you apparently have appendblobs with json_lines that can grow. This plugin doesn't handle that properly. It's probably not too hard to implement a new logtype that can deal with it, but I currently don't have much time on my hand. This plugin already depends on the azure-storage-ruby and append blobs are supported, so it doesn't require that much effort. https://github.com/Azure/azure-storage-ruby/blob/master/blob/lib/azure/storage/blob/append.rb |
I think the append blobs work the same as the blob blocks, that would mean that you may be able to set the head and tail to an empty string and it may work with json_lines. With a bit of luck I can test it over the weekend and if that's the case I can catch the exception you have and surpress the head and tail learning, so this could ork out of the box. But it's engineering, so hope tends to be quickly replaced by frantic troubleshooting |
Would that mean it would capture all the data in the append blob every time its appended, or is there a way to continue from last read line? Thank you so much for looking into this, appreciate it 👍 |
From what I understand from append blobs is that they use the same blocks and if that's the case the plugin should be able to regularly check if the file has grown and read from the offset the new bytes. But before that can work I have to make a workaround to avoid the InvalidBlobType. |
I finally pushed 0.12.9 with support for appendblobs. In the config you can set "append => true", but if you don't, the plugin will do it by itself during the invalideBlobType exception. I haven't done extensive testing, so feedback is welcome |
Hello,
Currently I have a diagnostic setting on my Azure Data Factory that sends pipeline/activity logs to a container in storage account in the following format "container/dir/dir/dir/y=2023/m=05/d=02/h=10/m=00/PT1H.json. For every hour, incoming logs from ADF get appended to one append blob (PT1H.json). So far, the input plugin is working fine when reading historical blobs (not current hour) and is offsetting read blobs in registry file. However, I'm running through an issue when reading the current appended blob. Scenario: its 12:00 and logs are written to storage account > logstash is running and picks up new json file > at 12:05 new logs are appended to the same json file > I get the below error
[INFO ] 2023-05-02 10:48:51.835 [[main]<azure_blob_storage] azureblobstorage - resuming from remote registry data/registry.dat
[ERROR] 2023-05-02 10:48:52.947 [[main]<azure_blob_storage] javapipeline - A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::AzureBlobStorage container=>"insights-logs-pipelineruns", codec=><LogStash::Codecs::JSONLines id=>"json_lines_cd27bbac-2203-44c5-9469-8925f1f88948", enable_metric=>true, charset=>"UTF-8", delimiter=>"\n">, interval=>10, id=>"f967be9e17d3af9286ab0875ce6754357103745f10dc7217b8275c3568271f9b", storageaccount=>"saeu2afglogpoc", access_key=>, enable_metric=>true, logtype=>"raw", dns_suffix=>"core.windows.net", registry_path=>"data/registry.dat", registry_create_policy=>"resume", addfilename=>false, addall=>false, debug_until=>0, debug_timer=>false, skip_learning=>false, file_head=>"{"records":[", file_tail=>"]}", path_filters=>["**/*"]>
Error: InvalidBlobType (409): The blob type is invalid for this operation.
RequestId:b063a660-f01e-0013-0ee3-7c48cd000000
Time:2023-05-02T10:48:52.8242660Z
Exception: Azure::Core::Http::HTTPError
Stack: /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/http/http_request.rb:154:in
call' org/jruby/RubyMethod.java:116:in
call'/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/http/signer_filter.rb:28:in
call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/http/http_request.rb:111:in
block in with_filter'/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/service.rb:36:in
call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/filtered_service.rb:34:in
call'/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/core/signed_service.rb:41:in
call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-common-2.0.4/lib/azure/storage/common/service/storage_service.rb:60:in
call'/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-blob-2.0.3/lib/azure/storage/blob/blob_service.rb:179:in
call' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/azure-storage-blob-2.0.3/lib/azure/storage/blob/block.rb:276:in
list_blob_blocks'/usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-azure_blob_storage-0.12.7/lib/logstash/inputs/azure_blob_storage.rb:413:in
partial_read' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-azure_blob_storage-0.12.7/lib/logstash/inputs/azure_blob_storage.rb:271:in
block in run'org/jruby/RubyHash.java:1519:in
each' /usr/share/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-input-azure_blob_storage-0.12.7/lib/logstash/inputs/azure_blob_storage.rb:246:in
run'/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:414:in
inputworker' /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:405:in
block in start_input'I guess what I'm trying to find out in short is if this plugin is able to read from append blobs. Please let me know if you need any further information on this matter. Thanks in advance.
The text was updated successfully, but these errors were encountered: