import sys import os import errno import time import json #import docker import argparse from argparse import ArgumentTypeError as err from base64 import b64decode from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from pathlib import Path class PathType(object): def __init__(self, exists=True, type='file', dash_ok=True): '''exists: True: a path that does exist False: a path that does not exist, in a valid parent directory None: don't care type: file, dir, symlink, None, or a function returning True for valid paths None: don't care dash_ok: whether to allow "-" as stdin/stdout''' assert exists in (True, False, None) assert type in ('file', 'dir', 'symlink', None) or hasattr(type, '__call__') self._exists = exists self._type = type self._dash_ok = dash_ok def __call__(self, string): if string == '-': # the special argument "-" means sys.std{in,out} if self._type == 'dir': raise err( 'standard input/output (-) not allowed as directory path') elif self._type == 'symlink': raise err( 'standard input/output (-) not allowed as symlink path') elif not self._dash_ok: raise err('standard input/output (-) not allowed') else: e = os.path.exists(string) if self._exists == True: if not e: raise err("path does not exist: '%s'" % string) if self._type is None: pass elif self._type == 'file': if not os.path.isfile(string): raise err("path is not a file: '%s'" % string) elif self._type == 'symlink': if not os.path.symlink(string): raise err("path is not a symlink: '%s'" % string) elif self._type == 'dir': if not os.path.isdir(string): raise err("path is not a directory: '%s'" % string) elif not self._type(string): raise err("path not valid: '%s'" % string) else: if self._exists == False and e: raise err("path exists: '%s'" % string) p = os.path.dirname(os.path.normpath(string)) or '.' if not os.path.isdir(p): raise err("parent path is not a directory: '%s'" % p) elif not os.path.exists(p): raise err("parent directory does not exist: '%s'" % p) return string def restartContainerWithDomain(domains): client = docker.from_env() container = client.containers.list(filters = {"label" : "com.github.SnowMB.traefik-certificate-extractor.restart_domain"}) for c in container: restartDomains = str.split(c.labels["com.github.SnowMB.traefik-certificate-extractor.restart_domain"], ',') if not domains.isdisjoint(restartDomains): print('restarting container ' + c.id) c.restart() def createCerts(file, directory, flat): # Read JSON file data = json.loads(open(file).read()) # Determine ACME version acme_version = 2 if 'acme-v02' in data['Account']['Registration']['uri'] else 1 # Find certificates if acme_version == 1: certs = data['DomainsCertificate']['Certs'] elif acme_version == 2: certs = data['Certificates'] # Loop over all certificates names = [] for c in certs: if acme_version == 1: name = c['Certificate']['Domain'] privatekey = c['Certificate']['PrivateKey'] fullchain = c['Certificate']['Certificate'] sans = c['Domains']['SANs'] elif acme_version == 2: name = c['Domain']['Main'] privatekey = c['Key'] fullchain = c['Certificate'] sans = c['Domain']['SANs'] # Decode private key, certificate and chain privatekey = b64decode(privatekey).decode('utf-8') fullchain = b64decode(fullchain).decode('utf-8') start = fullchain.find('-----BEGIN CERTIFICATE-----', 1) cert = fullchain[0:start] chain = fullchain[start:] # Create domain directory if it doesn't exist directory = Path(directory) if not directory.exists(): directory.mkdir() if flat: # Write private key, certificate and chain to flat files with (directory / name + '.key').open('w') as f: f.write(privatekey) with (directory / name + '.crt').open('w') as f: f.write(fullchain) with (directory / name + '.chain.pem').open('w') as f: f.write(chain) if sans: for name in sans: with (directory / name + '.key').open('w') as f: f.write(privatekey) with (directory / name + '.crt').open('w') as f: f.write(fullchain) with (directory / name + '.chain.pem').open('w') as f: f.write(chain) else: directory = directory / name if not directory.exists(): directory.mkdir() # Write private key, certificate and chain to file with (directory / 'privkey.pem').open('w') as f: f.write(privatekey) with (directory / 'cert.pem').open('w') as f: f.write(cert) with (directory / 'chain.pem').open('w') as f: f.write(chain) with (directory / 'fullchain.pem').open('w') as f: f.write(fullchain) print('Extracted certificate for: ' + name + (', ' + ', '.join(sans) if sans else '')) names.append(name) return names class Handler(FileSystemEventHandler): def __init__(self, args): self.args = args def on_created(self, event): self.handle(event) def on_modified(self, event): self.handle(event) def handle(self, event): # Check if it's a JSON file print('DEBUG : event fired') if not event.is_directory and event.src_path.endswith(str(self.args.certificate)): print('Certificates changed') domains = createCerts( event.src_path, self.args.directory, self.args.flat) if (self.args.restart_container): restartContainerWithDomains(domains) if __name__ == "__main__": parser = argparse.ArgumentParser( description='Extract traefik letsencrypt certificates.') parser.add_argument('-c', '--certificate', default='acme.json', type=PathType( exists=True), help='file that contains the traefik certificates (default acme.json)') parser.add_argument('-d', '--directory', default='.', type=PathType(type='dir'), help='output folder') parser.add_argument('-f', '--flat', action='store_true', help='outputs all certificates into one folder') parser.add_argument('-r', '--restart_container', action='store_true', help='uses the docker API to restart containers that are labeled accordingly') args = parser.parse_args() print('DEBUG: watching path: ' + str(args.certificate)) print('DEBUG: output path: ' + str(args.directory)) # Create event handler and observer event_handler = Handler(args) observer = Observer() # Register the directory to watch observer.schedule(event_handler, str(Path(args.certificate).parent)) # Main loop to watch the directory observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()