4 Commits

Author SHA1 Message Date
Daniel Huisman
97e71b2b5c Merge remote-tracking branch 'origin/feature-acme-v2' 2018-06-06 12:56:30 +02:00
Daniel Huisman
7842e09181 Improve the code I wrote when hungover, thanks to #7, closes #6 2018-06-06 12:56:08 +02:00
Daniel Huisman
9701cb9219 Start implementing ACME v2 support 2018-05-22 02:56:26 +02:00
Daniel Huisman
d94604591e Fix Docker logs, closes #5 2018-04-24 16:25:49 +02:00
2 changed files with 30 additions and 14 deletions

View File

@@ -15,4 +15,4 @@ RUN pip3 install -r requirements.txt
COPY . /app COPY . /app
# Define entrypoint of the app # Define entrypoint of the app
ENTRYPOINT ["python3", "extractor.py"] ENTRYPOINT ["python3", "-u", "extractor.py"]

View File

@@ -3,7 +3,6 @@ import os
import errno import errno
import time import time
import json import json
import argparse
from base64 import b64decode from base64 import b64decode
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
@@ -22,19 +21,38 @@ class Handler(FileSystemEventHandler):
# Read JSON file # Read JSON file
data = json.loads(open(event.src_path).read()) data = json.loads(open(event.src_path).read())
certs = data['DomainsCertificate']['Certs']
# Determine ACME version
acme_version = 2 if 'acme-v02' in data['Account']['Registration']['uri'] else 1
# Find certificates
if acme_version == 1:
certs = data['DomainsCertificate']['Certs']
elif acme_version == 2:
certs = data['Certificates']
# Loop over all certificates # Loop over all certificates
for c in certs: for c in certs:
if acme_version == 1:
name = c['Certificate']['Domain']
privatekey = c['Certificate']['PrivateKey']
fullchain = c['Certificate']['Certificate']
sans = c['Domains']['SANs']
elif acme_version == 2:
name = c['Domain']['Main']
privatekey = c['Key']
fullchain = c['Certificate']
sans = c['Domains']['SANs']
# Decode private key, certificate and chain # Decode private key, certificate and chain
privatekey = b64decode(c['Certificate']['PrivateKey']).decode('utf-8') privatekey = b64decode(privatekey).decode('utf-8')
fullchain = b64decode(c['Certificate']['Certificate']).decode('utf-8') fullchain = b64decode(fullchain).decode('utf-8')
start = fullchain.find('-----BEGIN CERTIFICATE-----', 1) start = fullchain.find('-----BEGIN CERTIFICATE-----', 1)
cert = fullchain[0:start] cert = fullchain[0:start]
chain = fullchain[start:] chain = fullchain[start:]
# Create domain directory if it doesn't exist # Create domain directory if it doesn't exist
directory = 'certs/' + c['Certificate']['Domain'] + '/' directory = 'certs/' + name + '/'
try: try:
os.makedirs(directory) os.makedirs(directory)
except OSError as error: except OSError as error:
@@ -57,15 +75,15 @@ class Handler(FileSystemEventHandler):
# Write private key, certificate and chain to flat files # Write private key, certificate and chain to flat files
directory = 'certs_flat/' directory = 'certs_flat/'
with open(directory + c['Certificate']['Domain'] + '.key', 'w') as f: with open(directory + name + '.key', 'w') as f:
f.write(privatekey) f.write(privatekey)
with open(directory + c['Certificate']['Domain'] + '.crt', 'w') as f: with open(directory + name + '.crt', 'w') as f:
f.write(fullchain) f.write(fullchain)
with open(directory + c['Certificate']['Domain'] + '.chain.pem', 'w') as f: with open(directory + name + '.chain.pem', 'w') as f:
f.write(chain) f.write(chain)
if c['Domains']['SANs']: if sans:
for name in c['Domains']['SANs']: for name in sans:
with open(directory + name + '.key', 'w') as f: with open(directory + name + '.key', 'w') as f:
f.write(privatekey) f.write(privatekey)
with open(directory + name + '.crt', 'w') as f: with open(directory + name + '.crt', 'w') as f:
@@ -73,11 +91,9 @@ class Handler(FileSystemEventHandler):
with open(directory + name + '.chain.pem', 'w') as f: with open(directory + name + '.chain.pem', 'w') as f:
f.write(chain) f.write(chain)
print('Extracted certificate for: ' + c['Domains']['Main'] + (', ' + ', '.join(c['Domains']['SANs']) if c['Domains']['SANs'] else '')) print('Extracted certificate for: ' + name + (', ' + ', '.join(sans) if sans else ''))
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Tool to extract Let\'s Encrypt certificates from Traefik\'s ACME storage file.')
# Determine path to watch # Determine path to watch
path = sys.argv[1] if len(sys.argv) > 1 else './data' path = sys.argv[1] if len(sys.argv) > 1 else './data'