feedwarrior

Slim, portable tooling for creating and distributing decentralized append logs
git clone git://git.defalsify.org/logwarrior.git
Log | Files | Refs | README | LICENSE

commit 0efcd0922be0ee321a00170f59a67f5115b810c3
parent c2f664299d7c56bcd73fe977f08236b8b891e90c
Author: lash <dev@holbrook.no>
Date:   Fri,  7 Jun 2024 13:58:11 +0100

Add autoconvert of non-multipart to multipart for cmd entry

Diffstat:
Mfeedwarrior/cmd/entry.py | 9++++++++-
Mfeedwarrior/entry.py | 37++++++++++++++++++++++++++++++++++++-
Afeedwarrior/error.py | 2++
Mtest/test_file.py | 3++-
Atest/test_multi.py | 42++++++++++++++++++++++++++++++++++++++++++
5 files changed, 90 insertions(+), 3 deletions(-)

diff --git a/feedwarrior/cmd/entry.py b/feedwarrior/cmd/entry.py @@ -11,6 +11,7 @@ import feedwarrior from feedwarrior import entry as feedentry from feedwarrior.adapters import fileadapter from feedwarrior.common import task_ids_to_uuids, check_task_uuids +from feedwarrior.error import NotMultipartError logg = logging.getLogger() @@ -36,7 +37,13 @@ def execute(config, feed, args): if args.task_uuid != None: task_uuids += check_task_uuids(config.task_dir, args.task_uuid) - entry = feedentry.from_multipart_file(args.path) + try: + entry = feedentry.from_multipart_file(args.path) + except NotMultipartError as e: + logg.info('{} is not a multipart message, will attempt to make one from it'.format(args.path)) + multipart_path = feedentry.to_multipart_file(args.path) + entry = feedentry.from_multipart_file(multipart_path) + for t in task_uuids: uu = feedwarrior.common.parse_uuid(t) entry.add_extension(feedwarrior.extension.TASKWARRIOR, uu) diff --git a/feedwarrior/entry.py b/feedwarrior/entry.py @@ -1,14 +1,20 @@ # standard imports +import os import email +import email.message +import email.utils import uuid import logging import base64 import enum import time import gzip +import tempfile +import datetime # local imports from .common import defaulthashers +from .error import NotMultipartError logg = logging.getLogger() @@ -61,6 +67,35 @@ class entry: 'payload': self.message.as_string(), } + +def to_multipart_file(path): + #d = tempfile.TemporaryDirectory() + #t = tempfile.NamedTemporaryFile(mode='w+', dir=d.name) + + basepath = os.path.basename(path) + outpath = '.' + basepath + '_multipart' + outpath = os.path.join(os.path.dirname(path), outpath) + f = open(path, 'r') + s = f.read() + f.close() + + env = email.message.EmailMessage() + env.add_header('Date', email.utils.formatdate()) + env.add_header('Content-Type', 'multipart/mixed') + env.add_attachment(s) + + for msg in env.iter_attachments(): + msg.add_header('Subject', 'Entry added ' + email.utils.formatdate(localtime=True)) + msg.replace_header('Content-Disposition', 'inline') + + f = open(outpath, 'w') + f.write(env.as_string()) + f.close() + + logg.debug("converted to multipath in " + outpath) + + return outpath + def from_multipart_file(filename, hashers=defaulthashers): @@ -76,7 +111,7 @@ def from_multipart_file(filename, hashers=defaulthashers): def from_multipart(m, hashers=defaulthashers): if not m.is_multipart(): - raise ValueError('{} is not a MIME multipart message'.format(filename)) + raise NotMultipartError('not a MIME multipart message') # the hasher calculates a uuid from the canonical order of the message contents # TODO: currently the canonical order is the order of items in the message. this should diff --git a/feedwarrior/error.py b/feedwarrior/error.py @@ -0,0 +1,2 @@ +class NotMultipartError(Exception): + pass diff --git a/test/test_file.py b/test/test_file.py @@ -39,8 +39,9 @@ class TestFileadapter(unittest.TestCase): e = entry(uu, msg) a.put(uu, e) - print('ieeeee') logg.debug(a.get(uu)) + + if __name__ == '__main__': unittest.main() diff --git a/test/test_multi.py b/test/test_multi.py @@ -0,0 +1,42 @@ +# standard imports +import os +import unittest +import tempfile +import logging +import uuid +import shutil + +# local imports +from feedwarrior.entry import to_multipart_file +from feedwarrior.entry import from_multipart_file + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class TestMultipart(unittest.TestCase): + + def setUp(self): + uu = uuid.uuid4() + self.feed_uuid = uu + self.path = tempfile.mkdtemp() + + + def tearDown(self): + shutil.rmtree(self.path) + + + def test_plain_file(self): + fp = os.path.join(self.path, 'plain.txt') + f = open(fp, 'w') + f.write("foo\nis bar\n") + f.close() + + fp_multi = to_multipart_file(fp) + + o = from_multipart_file(fp_multi) + print(o.serialize()) + + +if __name__ == '__main__': + unittest.main()